Compare commits

...

34 Commits

Author SHA1 Message Date
miconis 3f2d3253e4 Merge branch 'stable_ids' into deduptesting 4 years ago
miconis 1699d41d39 relations for openorgs: not it choose only one master 4 years ago
Claudio Atzori e5da4ee9b1 dedup workflow using the common PidComparator 4 years ago
Claudio Atzori ea2a0ea949 IdentifierFactory considers only DOIs matching a given regex 4 years ago
Claudio Atzori 86d6fbe95b refactoring: CleaningFunctions and OafMapperUtils moved in dhp-commong 4 years ago
Claudio Atzori 8471888ad3 Merge branch 'graph_cleaning' into stable_ids 4 years ago
Claudio Atzori 3fcd669e99 result merge operation leverage on custom ResultTypeComparator in the aggregator graph construction 4 years ago
Claudio Atzori 78c3c1b62b exclude pid values set to 'none' 4 years ago
Claudio Atzori 8e7f81c5f5 code formatting 4 years ago
Claudio Atzori 09e44dabff Merge branch 'master' into stable_ids 4 years ago
Claudio Atzori 385214eeae code formatting 4 years ago
Claudio Atzori 04ad8969b2 anticipated execution of the graph cleaning workflow 4 years ago
Claudio Atzori 4ca75d6951 Merge pull request 'Dedup ID creation policy' (#48) from deduptesting into stable_ids 4 years ago
Claudio Atzori 58f28296ea ProvisionConstants moved as ModelHardLimits in dhp-common and applied to truncate long abstracts (len > 150000). Further filtering for empty PID values 4 years ago
miconis c4a59d1b9a merge with the master to port the new packages 4 years ago
miconis 708d887e64 minor changes 4 years ago
miconis 0e54803177 bug fix in the id generator and implementation of jobs for organization dedup 4 years ago
Claudio Atzori 266bf1a221 common IdentifierFactory in use on the mapping from the aggregator data; merge the entities sharing the same id; code formatting 4 years ago
Claudio Atzori 34f1d0904b common IdentifierFactory in use on the mapping from the aggregator data 4 years ago
Claudio Atzori c188868450 Merge branch 'master' into stable_ids 4 years ago
Claudio Atzori 3e6c8bca39 Merge branch 'master' into stable_ids 4 years ago
miconis 6f8720982c bug fix in the idgenerator and test implementation 4 years ago
Claudio Atzori 8958f20813 code formatting 4 years ago
Claudio Atzori 1abcabb6e6 WIP stable ids: IdentifierFactory & unit test 4 years ago
miconis 1804c5d809 refactoring: classes moved in the right package 4 years ago
miconis 7093355487 bug fix and minor changes 4 years ago
Claudio Atzori 642b459552 Merge branch 'master' of https://code-repo.d4science.org/D-Net/dnet-hadoop into stable_ids 4 years ago
Claudio Atzori 6ce340bd3d WIP stable ids: IdentifierFactory 4 years ago
miconis a2ac7e52fb implementation of the workflow for new organizations in openorgs 4 years ago
miconis e3f7798d1b minor changes in dedup tests, bug fix in the idgenerator and pace-core version update 4 years ago
miconis 4cf79f32eb implementation of the oozie wf to prepare the openorgs input: relations between organizations 4 years ago
miconis 259362ef47 implementation of the job to collect simrels from postgres db 4 years ago
miconis d47352cbc7 refactoring of the procedure for the id generation, minor changes and addition of a comparation on the original id and the origin datasource 4 years ago
miconis b260fee787 implementation of the dedup_id generation using pids to make the graph more stable 4 years ago

@ -29,6 +29,12 @@
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>

@ -1,8 +1,9 @@
package eu.dnetlib.dhp.oa.graph.clean;
package eu.dnetlib.dhp.schema.oaf;
import java.util.LinkedHashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -10,13 +11,11 @@ import org.apache.commons.lang3.StringUtils;
import com.clearspring.analytics.util.Lists;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
public class CleaningFunctions {
public static final String DOI_URL_PREFIX_REGEX = "(^http(s?):\\/\\/)(((dx\\.)?doi\\.org)|(handle\\.test\\.datacite\\.org))\\/";
public static final String ORCID_PREFIX_REGEX = "^http(s?):\\/\\/orcid\\.org\\/";
public static final String NONE = "none";
@ -72,7 +71,7 @@ public class CleaningFunctions {
return value;
}
protected static <T extends Oaf> T fixDefaults(T value) {
public static <T extends Oaf> T fixDefaults(T value) {
if (value instanceof Datasource) {
// nothing to clean here
} else if (value instanceof Project) {
@ -109,20 +108,17 @@ public class CleaningFunctions {
}
if (Objects.nonNull(r.getPid())) {
r
.setPid(
r
.getPid()
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue())))
.filter(sp -> NONE.equalsIgnoreCase(sp.getValue()))
.filter(sp -> Objects.nonNull(sp.getQualifier()))
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
.map(sp -> {
sp.setValue(StringUtils.trim(sp.getValue()));
return sp;
})
.collect(Collectors.toList()));
.setPid(
r
.getPid()
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue())))
.filter(sp -> NONE.equalsIgnoreCase(sp.getValue()))
.filter(sp -> Objects.nonNull(sp.getQualifier()))
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
.map(CleaningFunctions::normalizePidValue)
.collect(Collectors.toList()));
}
if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
r
@ -143,7 +139,7 @@ public class CleaningFunctions {
}
}
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance());
Qualifier bestaccessrights = OafMapperUtils.createBestAccessRights(r.getInstance());
if (Objects.isNull(bestaccessrights)) {
r
.setBestaccessright(
@ -219,4 +215,24 @@ public class CleaningFunctions {
classid, classname, scheme, scheme);
}
/**
* Utility method that normalises PID values on a per-type basis.
* @param pid the PID whose value will be normalised.
* @return the PID containing the normalised value.
*/
public static StructuredProperty normalizePidValue(StructuredProperty pid) {
String value = Optional
.ofNullable(pid.getValue())
.map(String::trim)
.orElseThrow(() -> new IllegalArgumentException("PID value cannot be empty"));
switch (pid.getQualifier().getClassid()) {
// TODO add cleaning for more PID types as needed
case "doi":
pid.setValue(value.toLowerCase().replaceAll(DOI_URL_PREFIX_REGEX, ""));
break;
}
return pid;
}
}

@ -1,14 +1,14 @@
package eu.dnetlib.dhp.oa.provision;
package eu.dnetlib.dhp.schema.oaf;
public class ProvisionConstants {
public class ModelHardLimits {
public static final int MAX_EXTERNAL_ENTITIES = 50;
public static final int MAX_AUTHORS = 200;
public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
public static final int MAX_TITLE_LENGTH = 5000;
public static final int MAX_TITLES = 10;
public static final int MAX_ABSTRACT_LENGTH = 100000;
public static final int MAX_ABSTRACT_LENGTH = 150000;
public static final int MAX_INSTANCES = 10;
}

@ -1,11 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
package eu.dnetlib.dhp.schema.oaf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_ACCESS_MODES;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
@ -13,15 +12,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
import eu.dnetlib.dhp.schema.oaf.OriginDescription;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.common.LicenseComparator;
import eu.dnetlib.dhp.utils.DHPUtils;
public class OafMapperUtils {
@ -270,4 +261,36 @@ public class OafMapperUtils {
final Map<Object, Boolean> seen = new ConcurrentHashMap<>();
return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
}
public static Qualifier createBestAccessRights(final List<Instance> instanceList) {
return getBestAccessRights(instanceList);
}
protected static Qualifier getBestAccessRights(final List<Instance> instanceList) {
if (instanceList != null) {
final Optional<Qualifier> min = instanceList
.stream()
.map(i -> i.getAccessright())
.min(new LicenseComparator());
final Qualifier rights = min.isPresent() ? min.get() : new Qualifier();
if (StringUtils.isBlank(rights.getClassid())) {
rights.setClassid(UNKNOWN);
}
if (StringUtils.isBlank(rights.getClassname())
|| UNKNOWN.equalsIgnoreCase(rights.getClassname())) {
rights.setClassname(NOT_AVAILABLE);
}
if (StringUtils.isBlank(rights.getSchemeid())) {
rights.setSchemeid(DNET_ACCESS_MODES);
}
if (StringUtils.isBlank(rights.getSchemename())) {
rights.setSchemename(DNET_ACCESS_MODES);
}
return rights;
}
return null;
}
}

@ -0,0 +1,49 @@
package eu.dnetlib.dhp.schema.oaf;
import java.util.Comparator;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class ResultTypeComparator implements Comparator<Result> {
@Override
public int compare(Result left, Result right) {
if (left == null && right == null)
return 0;
if (left == null)
return 1;
if (right == null)
return -1;
String lClass = left.getResulttype().getClassid();
String rClass = right.getResulttype().getClassid();
if (lClass.equals(rClass))
return 0;
if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return 1;
// Else (but unlikely), lexicographical ordering will do.
return lClass.compareTo(rClass);
}
}

@ -0,0 +1,102 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import eu.dnetlib.dhp.schema.oaf.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils;
/**
* Factory class for OpenAIRE identifiers in the Graph
*/
public class IdentifierFactory implements Serializable {
public static final String ID_SEPARATOR = "::";
public static final String ID_PREFIX_SEPARATOR = "|";
public final static String ID_REGEX = "^[0-9][0-9]\\" + ID_PREFIX_SEPARATOR + ".{12}" + ID_SEPARATOR
+ "[a-zA-Z0-9]{32}$";
public final static String DOI_REGEX = "(^10\\.[0-9]{4,9}\\/[-._;()\\/:a-zA-Z0-9]+$)|" +
"(^10\\.1002\\/[^\\s]+$)|" +
"(^10\\.1021\\/[a-zA-Z0-9_][a-zA-Z0-9_][0-9]++$)|" +
"(^10\\.1207\\/[a-zA-Z0-9_]+\\&[0-9]+_[0-9]+$)";
public static final int ID_PREFIX_LEN = 12;
public static final String NONE = "none";
/**
* Creates an identifier from the most relevant PID (if available) in the given entity T. Returns entity.id
* when no PID is available
* @param entity the entity providing PIDs and a default ID.
* @param <T> the specific entity type. Currently Organization and Result subclasses are supported.
* @return an identifier from the most relevant PID, entity.id otherwise
*/
public static <T extends OafEntity> String createIdentifier(T entity) {
if (Objects.isNull(entity.getPid()) || entity.getPid().isEmpty()) {
return entity.getId();
}
return entity
.getPid()
.stream()
.filter(s -> pidFilter(s))
.min(new PidComparator<>(entity))
.map(s -> idFromPid(entity, s))
.map(IdentifierFactory::verifyIdSyntax)
.orElseGet(entity::getId);
}
protected static boolean pidFilter(StructuredProperty s) {
if (Objects.isNull(s.getQualifier()) ||
StringUtils.isBlank(StringUtils.trim(s.getValue()))) {
return false;
}
try {
switch (PidType.valueOf(s.getQualifier().getClassid())) {
case doi:
final String doi = StringUtils.trim(StringUtils.lowerCase(s.getValue()));
return doi.matches(DOI_REGEX);
default:
return true;
}
} catch (IllegalArgumentException e) {
return false;
}
}
private static String verifyIdSyntax(String s) {
if (StringUtils.isBlank(s) || !s.matches(ID_REGEX)) {
throw new RuntimeException(String.format("malformed id: '%s'", s));
} else {
return s;
}
}
private static <T extends OafEntity> String idFromPid(T entity, StructuredProperty s) {
return new StringBuilder()
.append(StringUtils.substringBefore(entity.getId(), ID_PREFIX_SEPARATOR))
.append(ID_PREFIX_SEPARATOR)
.append(createPrefix(s.getQualifier().getClassid()))
.append(ID_SEPARATOR)
.append(DHPUtils.md5(CleaningFunctions.normalizePidValue(s).getValue()))
.toString();
}
// create the prefix (length = 12)
private static String createPrefix(String pidType) {
StringBuilder prefix = new StringBuilder(StringUtils.left(pidType, ID_PREFIX_LEN));
while (prefix.length() < ID_PREFIX_LEN) {
prefix.append("_");
}
return prefix.substring(0, ID_PREFIX_LEN);
}
}

@ -0,0 +1,27 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import java.util.Comparator;
public class OrganizationPidComparator implements Comparator<PidType> {
@Override
public int compare(PidType pLeft, PidType pRight) {
if (pLeft.equals(PidType.GRID))
return -1;
if (pRight.equals(PidType.GRID))
return 1;
if (pLeft.equals(PidType.mag_id))
return -1;
if (pRight.equals(PidType.mag_id))
return 1;
if (pLeft.equals(PidType.urn))
return -1;
if (pRight.equals(PidType.urn))
return 1;
return 0;
}
}

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import java.util.Comparator;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class PidComparator<T extends OafEntity> implements Comparator<StructuredProperty> {
private T entity;
public PidComparator(T entity) {
this.entity = entity;
}
@Override
public int compare(StructuredProperty left, StructuredProperty right) {
if (left == null && right == null)
return 0;
if (left == null)
return 1;
if (right == null)
return -1;
PidType lClass = PidType.valueOf(left.getQualifier().getClassid());
PidType rClass = PidType.valueOf(right.getQualifier().getClassid());
if (lClass.equals(rClass))
return 0;
if (ModelSupport.isSubClass(entity, Result.class)) {
return compareResultPids(lClass, rClass);
}
if (ModelSupport.isSubClass(entity, Organization.class)) {
return compareOrganizationtPids(lClass, rClass);
}
// Else (but unlikely), lexicographical ordering will do.
return lClass.compareTo(rClass);
}
private int compareResultPids(PidType lClass, PidType rClass) {
return new ResultPidComparator().compare(lClass, rClass);
}
private int compareOrganizationtPids(PidType lClass, PidType rClass) {
return new OrganizationPidComparator().compare(lClass, rClass);
}
}

@ -0,0 +1,29 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import org.apache.commons.lang3.EnumUtils;
public enum PidType {
// Result
doi, pmid, pmc, handle, arXiv, NCID, GBIF, nct, pdb,
// Organization
GRID, mag_id, urn,
// Used by dedup
undefined, original;
public static boolean isValid(String type) {
return EnumUtils.isValidEnum(PidType.class, type);
}
public static PidType tryValueOf(String s) {
try {
return PidType.valueOf(s);
} catch (Exception e) {
return PidType.original;
}
}
}

@ -0,0 +1,57 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import java.util.Comparator;
public class ResultPidComparator implements Comparator<PidType> {
@Override
public int compare(PidType pLeft, PidType pRight) {
if (pLeft.equals(PidType.doi))
return -1;
if (pRight.equals(PidType.doi))
return 1;
if (pLeft.equals(PidType.pmid))
return -1;
if (pRight.equals(PidType.pmid))
return 1;
if (pLeft.equals(PidType.pmc))
return -1;
if (pRight.equals(PidType.pmc))
return 1;
if (pLeft.equals(PidType.handle))
return -1;
if (pRight.equals(PidType.handle))
return 1;
if (pLeft.equals(PidType.arXiv))
return -1;
if (pRight.equals(PidType.arXiv))
return 1;
if (pLeft.equals(PidType.NCID))
return -1;
if (pRight.equals(PidType.NCID))
return 1;
if (pLeft.equals(PidType.GBIF))
return -1;
if (pRight.equals(PidType.GBIF))
return 1;
if (pLeft.equals(PidType.nct))
return -1;
if (pRight.equals(PidType.nct))
return 1;
if (pLeft.equals(PidType.urn))
return -1;
if (pRight.equals(PidType.urn))
return 1;
return 0;
}
}

@ -0,0 +1,47 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import static org.junit.jupiter.api.Assertions.*;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.utils.DHPUtils;
public class IdentifierFactoryTest {
private static ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@Test
public void testCreateIdentifierForPublication() throws IOException {
verifyIdentifier("publication_doi.json", "50|doi_________::" + DHPUtils.md5("10.1016/j.cmet.2011.03.013"));
verifyIdentifier("publication_pmc.json", "50|pmc_________::" + DHPUtils.md5("21459329"));
verifyIdentifier(
"publication_urn.json",
"50|urn_________::" + DHPUtils.md5("urn:nbn:nl:ui:29-f3ed5f9e-edf6-457e-8848-61b58a4075e2"));
final String defaultID = "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f";
verifyIdentifier("publication_3.json", defaultID);
verifyIdentifier("publication_4.json", defaultID);
verifyIdentifier("publication_5.json", defaultID);
}
protected void verifyIdentifier(String filename, String expectedID) throws IOException {
final String json = IOUtils.toString(getClass().getResourceAsStream(filename));
final Publication pub = OBJECT_MAPPER.readValue(json, Publication.class);
String id = IdentifierFactory.createIdentifier(pub);
assertNotNull(id);
assertEquals(expectedID, id);
}
}

@ -0,0 +1 @@
{"id":"50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f","pid":[{"qualifier":{"classid":"scp-number"},"value":"79953761260"}]}

@ -0,0 +1 @@
{"id":"50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f","pid":[]}

@ -0,0 +1 @@
{"id":"50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f"}

@ -0,0 +1 @@
{"id":"50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f","pid":[{"qualifier":{"classid":"doi"},"value":"10.1016/j.cmet.2011.03.013"},{"qualifier":{"classid":"urn"},"value":"urn:nbn:nl:ui:29-f3ed5f9e-edf6-457e-8848-61b58a4075e2"},{"qualifier":{"classid":"scp-number"},"value":"79953761260"},{"qualifier":{"classid":"pmc"},"value":"21459329"}]}

@ -0,0 +1 @@
{"id":"50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f","pid":[{"qualifier":{"classid":"urn"},"value":"urn:nbn:nl:ui:29-f3ed5f9e-edf6-457e-8848-61b58a4075e2"},{"qualifier":{"classid":"scp-number"},"value":"79953761260"},{"qualifier":{"classid":"pmc"},"value":"21459329"}]}

@ -0,0 +1 @@
{"id":"50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f","pid":[{"qualifier":{"classid":"urn"},"value":"urn:nbn:nl:ui:29-f3ed5f9e-edf6-457e-8848-61b58a4075e2"},{"qualifier":{"classid":"scp-number"},"value":"79953761260"},{"qualifier":{"classid":"pmcid"},"value":"21459329"}]}

@ -9,7 +9,6 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@Disabled
public class HttpConnectorTest {

@ -90,6 +90,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
</dependencies>

@ -29,6 +29,7 @@ import eu.dnetlib.pace.config.DedupConfig;
abstract class AbstractSparkAction implements Serializable {
protected static final int NUM_PARTITIONS = 1000;
protected static final int NUM_CONNECTIONS = 20;
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

@ -18,7 +18,7 @@ import eu.dnetlib.dhp.schema.oaf.Field;
public class DatePicker {
private static final String DATE_PATTERN = "\\d{4}-\\d{2}-\\d{2}";
public static final String DATE_PATTERN = "\\d{4}-\\d{2}-\\d{2}";
private static final String DATE_DEFAULT_SUFFIX = "01-01";
private static final int YEAR_LB = 1300;
private static final int YEAR_UB = Year.now().getValue() + 5;
@ -114,7 +114,7 @@ public class DatePicker {
}
}
private static boolean inRange(final String date) {
public static boolean inRange(final String date) {
final int year = Integer.parseInt(substringBefore(date, "-"));
return year >= YEAR_LB && year <= YEAR_UB;
}

@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
@ -81,11 +82,16 @@ public class DedupRecordFactory {
final Collection<String> dates = Lists.newArrayList();
final List<List<Author>> authors = Lists.newArrayList();
final List<Identifier<T>> bestPids = Lists.newArrayList(); // best pids list
entities
.forEachRemaining(
t -> {
T duplicate = t._2();
// prepare the list of pids to use for the id generation
bestPids.add(Identifier.newInstance(duplicate));
entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result r1 = (Result) duplicate;
@ -94,6 +100,7 @@ public class DedupRecordFactory {
if (r1.getDateofacceptance() != null)
dates.add(r1.getDateofacceptance().getValue());
}
});
// set authors and date
@ -102,10 +109,12 @@ public class DedupRecordFactory {
((Result) entity).setAuthor(AuthorMerger.merge(authors));
}
entity.setId(id);
entity.setId(IdGenerator.generate(bestPids, id));
entity.setLastupdatetimestamp(ts);
entity.setDataInfo(dataInfo);
return entity;
}
}

@ -70,17 +70,6 @@ public class DedupUtility {
return Sets.newHashSet(BlacklistAwareClusteringCombiner.filterAndCombine(doc, conf));
}
public static String md5(final String s) {
try {
final MessageDigest md = MessageDigest.getInstance("MD5");
md.update(s.getBytes(StandardCharsets.UTF_8));
return new String(Hex.encodeHex(md.digest()));
} catch (final Exception e) {
System.err.println("Error creating id");
return null;
}
}
public static String createDedupRecordPath(
final String basePath, final String actionSetId, final String entityType) {
return String.format("%s/%s/%s_deduprecord", basePath, actionSetId, entityType);

@ -0,0 +1,46 @@
package eu.dnetlib.dhp.oa.dedup;
import static org.apache.commons.lang3.StringUtils.substringAfter;
import static org.apache.commons.lang3.StringUtils.substringBefore;
import java.io.Serializable;
import java.util.List;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
public class IdGenerator implements Serializable {
// pick the best pid from the list (consider date and pidtype)
public static <T extends OafEntity> String generate(List<Identifier<T>> pids, String defaultID) {
if (pids == null || pids.size() == 0)
return defaultID;
Identifier<T> bp = pids
.stream()
.min(Identifier::compareTo)
.get();
String prefix = substringBefore(bp.getOriginalID(), "|");
String ns = substringBefore(substringAfter(bp.getOriginalID(), "|"), "::");
String suffix = substringAfter(bp.getOriginalID(), "::");
final String pidType = substringBefore(ns, "_");
if (PidType.isValid(pidType)) {
return prefix + "|" + dedupify(ns) + "::" + suffix;
} else {
return prefix + "|dedup_wf_001::" + suffix;
}
}
private static String dedupify(String ns) {
StringBuilder prefix = new StringBuilder(substringBefore(ns, "_")).append("_dedup");
while (prefix.length() < 12) {
prefix.append("_");
}
return prefix.substring(0, 12);
}
}

@ -0,0 +1,184 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import scala.Tuple2;
public class SparkCollectSimRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCollectSimRels.class);
Dataset<Row> simGroupsDS;
Dataset<Row> groupsDS;
public SparkCollectSimRels(ArgumentApplicationParser parser, SparkSession spark, Dataset<Row> simGroupsDS,
Dataset<Row> groupsDS) {
super(parser, spark);
this.simGroupsDS = simGroupsDS;
this.groupsDS = groupsDS;
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkBlockStats.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
final String dbUrl = parser.get("postgresUrl");
final String dbUser = parser.get("postgresUser");
final String dbPassword = parser.get("postgresPassword");
SparkSession spark = getSparkSession(conf);
DataFrameReader readOptions = spark
.read()
.format("jdbc")
.option("url", dbUrl)
.option("user", dbUser)
.option("password", dbPassword);
new SparkCollectSimRels(
parser,
spark,
readOptions.option("dbtable", "similarity_groups").load(),
readOptions.option("dbtable", "groups").load())
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
void run(ISLookUpService isLookUpService) throws DocumentException, ISLookUpException, IOException {
// read oozie parameters
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
final String dbUrl = parser.get("postgresUrl");
final String dbUser = parser.get("postgresUser");
log.info("numPartitions: '{}'", numPartitions);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("postgresUser: {}", dbUser);
log.info("postgresUrl: {}", dbUrl);
log.info("postgresPassword: xxx");
JavaPairRDD<String, List<String>> similarityGroup = simGroupsDS
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)))
.groupByKey()
.mapToPair(
i -> new Tuple2<>(i._1(), StreamSupport
.stream(i._2().spliterator(), false)
.collect(Collectors.toList())));
JavaPairRDD<String, String> groupIds = groupsDS
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)));
JavaRDD<Tuple2<Tuple2<String, String>, List<String>>> groups = similarityGroup
.leftOuterJoin(groupIds)
.filter(g -> g._2()._2().isPresent())
.map(g -> new Tuple2<>(new Tuple2<>(g._1(), g._2()._2().get()), g._2()._1()));
JavaRDD<Relation> relations = groups.flatMap(g -> {
String firstId = g._2().get(0);
List<Relation> rels = new ArrayList<>();
for (String id : g._2()) {
if (!firstId.equals(id))
rels.add(createSimRel(firstId, id, g._1()._2()));
}
return rels.iterator();
});
Dataset<Relation> resultRelations = spark
.createDataset(
relations.filter(r -> r.getRelType().equals("resultResult")).rdd(),
Encoders.bean(Relation.class))
.repartition(numPartitions);
Dataset<Relation> organizationRelations = spark
.createDataset(
relations.filter(r -> r.getRelType().equals("organizationOrganization")).rdd(),
Encoders.bean(Relation.class))
.repartition(numPartitions);
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
switch (dedupConf.getWf().getSubEntityValue()) {
case "organization":
savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization");
break;
default:
savePostgresRelation(
resultRelations, workingPath, actionSetId, dedupConf.getWf().getSubEntityValue());
break;
}
}
}
private Relation createSimRel(String source, String target, String entity) {
final Relation r = new Relation();
r.setSubRelType("dedupSimilarity");
r.setRelClass("isSimilarTo");
r.setDataInfo(new DataInfo());
switch (entity) {
case "result":
r.setSource("50|" + source);
r.setTarget("50|" + target);
r.setRelType("resultResult");
break;
case "organization":
r.setSource("20|" + source);
r.setTarget("20|" + target);
r.setRelType("organizationOrganization");
break;
default:
throw new IllegalArgumentException("unmanaged entity type: " + entity);
}
return r;
}
private void savePostgresRelation(Dataset<Relation> newRelations, String workingPath, String actionSetId,
String entityType) {
newRelations
.write()
.mode(SaveMode.Append)
.parquet(DedupUtility.createSimRelPath(workingPath, actionSetId, entityType));
}
}

@ -106,10 +106,8 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final RDD<Edge<String>> edgeRdd = spark
.read()
.textFile(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
.map(
(MapFunction<String, Relation>) r -> OBJECT_MAPPER.readValue(r, Relation.class),
Encoders.bean(Relation.class))
.load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
.as(Encoders.bean(Relation.class))
.javaRDD()
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
.rdd();

@ -100,12 +100,17 @@ public class SparkCreateSimRels extends AbstractSparkAction {
.repartition(numPartitions);
// create relations by comparing only elements in the same group
Deduper
.computeRelations(sc, blocks, dedupConf)
.map(t -> createSimRel(t._1(), t._2(), entity))
.repartition(numPartitions)
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(outputPath);
spark
.createDataset(
Deduper
.computeRelations(sc, blocks, dedupConf)
.map(t -> createSimRel(t._1(), t._2(), entity))
.repartition(numPartitions)
.rdd(),
Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.parquet(outputPath);
}
}

@ -0,0 +1,170 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.Optional;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
public class SparkPrepareNewOrgs extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
public SparkPrepareNewOrgs(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkPrepareNewOrgs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
new SparkPrepareNewOrgs(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
public void run(ISLookUpService isLookUpService) throws IOException {
final String graphBasePath = parser.get("graphBasePath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numConnections = Optional
.ofNullable(parser.get("numConnections"))
.map(Integer::valueOf)
.orElse(NUM_CONNECTIONS);
final String apiUrl = Optional
.ofNullable(parser.get("apiUrl"))
.orElse("");
final String dbUrl = parser.get("dbUrl");
final String dbTable = parser.get("dbTable");
final String dbUser = parser.get("dbUser");
final String dbPwd = parser.get("dbPwd");
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("numPartitions: '{}'", numConnections);
log.info("apiUrl: '{}'", apiUrl);
log.info("dbUrl: '{}'", dbUrl);
log.info("dbUser: '{}'", dbUser);
log.info("table: '{}'", dbTable);
log.info("dbPwd: '{}'", "xxx");
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization");
Dataset<OrgSimRel> newOrgs = createNewOrgs(spark, mergeRelPath, entityPath);
final Properties connectionProperties = new Properties();
connectionProperties.put("user", dbUser);
connectionProperties.put("password", dbPwd);
newOrgs
.repartition(numConnections)
.write()
.mode(SaveMode.Append)
.jdbc(dbUrl, dbTable, connectionProperties);
if (!apiUrl.isEmpty())
updateSimRels(apiUrl);
}
public static Dataset<OrgSimRel> createNewOrgs(
final SparkSession spark,
final String mergeRelsPath,
final String entitiesPath) {
// <id, json_entity>
Dataset<Tuple2<String, Organization>> entities = spark
.read()
.textFile(entitiesPath)
.map(
(MapFunction<String, Tuple2<String, Organization>>) it -> {
Organization entity = OBJECT_MAPPER.readValue(it, Organization.class);
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
Dataset<Tuple2<String, String>> mergerels = spark
.createDataset(
spark
.read()
.load(mergeRelsPath)
.as(Encoders.bean(Relation.class))
.where("relClass == 'isMergedIn'")
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
.rdd(),
Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
return entities
.joinWith(mergerels, entities.col("_1").equalTo(mergerels.col("_1")), "left")
.filter((FilterFunction<Tuple2<Tuple2<String, Organization>, Tuple2<String, String>>>) t -> t._2() == null)
.filter(
(FilterFunction<Tuple2<Tuple2<String, Organization>, Tuple2<String, String>>>) t -> !t
._1()
._1()
.contains("openorgs"))
.map(
(MapFunction<Tuple2<Tuple2<String, Organization>, Tuple2<String, String>>, OrgSimRel>) r -> new OrgSimRel(
"",
r._1()._2().getOriginalId().get(0),
r._1()._2().getLegalname() != null ? r._1()._2().getLegalname().getValue() : "",
r._1()._2().getLegalshortname() != null ? r._1()._2().getLegalshortname().getValue() : "",
r._1()._2().getCountry() != null ? r._1()._2().getCountry().getClassid() : "",
r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "",
r._1()._2().getCollectedfrom().get(0).getValue(), ""),
Encoders.bean(OrgSimRel.class));
}
private static String updateSimRels(final String apiUrl) throws IOException {
log.info("Updating simrels on the portal");
final HttpGet req = new HttpGet(apiUrl);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
return IOUtils.toString(response.getEntity().getContent());
}
}
}
}

@ -0,0 +1,265 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
import scala.Tuple3;
public class SparkPrepareOrgRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
new SparkPrepareOrgRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
public void run(ISLookUpService isLookUpService) throws IOException {
final String graphBasePath = parser.get("graphBasePath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numConnections = Optional
.ofNullable(parser.get("numConnections"))
.map(Integer::valueOf)
.orElse(NUM_CONNECTIONS);
final String dbUrl = parser.get("dbUrl");
final String dbTable = parser.get("dbTable");
final String dbUser = parser.get("dbUser");
final String dbPwd = parser.get("dbPwd");
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("numPartitions: '{}'", numConnections);
log.info("dbUrl: '{}'", dbUrl);
log.info("dbUser: '{}'", dbUser);
log.info("table: '{}'", dbTable);
log.info("dbPwd: '{}'", "xxx");
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization");
Dataset<OrgSimRel> relations = createRelations(spark, mergeRelPath, entityPath);
final Properties connectionProperties = new Properties();
connectionProperties.put("user", dbUser);
connectionProperties.put("password", dbPwd);
relations
.repartition(numConnections)
.write()
.mode(SaveMode.Overwrite)
.jdbc(dbUrl, dbTable, connectionProperties);
}
public static Dataset<OrgSimRel> createRelations(
final SparkSession spark,
final String mergeRelsPath,
final String entitiesPath) {
Dataset<Tuple2<String, Organization>> entities = spark
.read()
.textFile(entitiesPath)
.map(
(MapFunction<String, Tuple2<String, Organization>>) it -> {
Organization entity = OBJECT_MAPPER.readValue(it, Organization.class);
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
Dataset<Tuple3<String, String, String>> relations = spark
.createDataset(
spark
.read()
.load(mergeRelsPath)
.as(Encoders.bean(Relation.class))
.where("relClass == 'merges'")
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
.filter(t -> !t._2().contains("openorgsmesh"))
.groupByKey()
.map(g -> Lists.newArrayList(g._2()))
.filter(l -> l.size() > 1)
.flatMap(l -> {
String groupId = "group::" + UUID.randomUUID();
List<String> ids = sortIds(l);
List<Tuple3<String, String, String>> rels = new ArrayList<>();
String source = ids.get(0);
for (String target : ids) {
rels.add(new Tuple3<>(source, target, groupId));
}
return rels.iterator();
})
.rdd(),
Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()));
Dataset<Tuple2<String, OrgSimRel>> relations2 = relations // <openorgs, corda>
.joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner")
.map(
(MapFunction<Tuple2<Tuple3<String, String, String>, Tuple2<String, Organization>>, OrgSimRel>) r -> new OrgSimRel(
r._1()._1(),
r._2()._2().getOriginalId().get(0),
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
r._2()._2().getCollectedfrom().get(0).getValue(),
r._1()._3()),
Encoders.bean(OrgSimRel.class))
.map(
(MapFunction<OrgSimRel, Tuple2<String, OrgSimRel>>) o -> new Tuple2<>(o.getLocal_id(), o),
Encoders.tuple(Encoders.STRING(), Encoders.bean(OrgSimRel.class)));
return relations2
.joinWith(entities, relations2.col("_1").equalTo(entities.col("_1")), "inner")
.map(
(MapFunction<Tuple2<Tuple2<String, OrgSimRel>, Tuple2<String, Organization>>, OrgSimRel>) r -> {
OrgSimRel orgSimRel = r._1()._2();
orgSimRel.setLocal_id(r._2()._2().getOriginalId().get(0));
return orgSimRel;
},
Encoders.bean(OrgSimRel.class));
}
// select best ids from the list. Priority: 1) openorgs, 2)corda, 3)alphabetic
public static List<String> sortIds(List<String> ids) {
ids.sort((o1, o2) -> {
if (o1.contains("openorgs____") && o2.contains("openorgs____"))
return o1.compareTo(o2);
if (o1.contains("corda") && o2.contains("corda"))
return o1.compareTo(o2);
if (o1.contains("openorgs____"))
return -1;
if (o2.contains("openorgs____"))
return 1;
if (o1.contains("corda"))
return -1;
if (o2.contains("corda"))
return 1;
return o1.compareTo(o2);
});
return ids;
}
public static Dataset<OrgSimRel> createRelationsFromScratch(
final SparkSession spark,
final String mergeRelsPath,
final String entitiesPath) {
// <id, json_entity>
Dataset<Tuple2<String, Organization>> entities = spark
.read()
.textFile(entitiesPath)
.map(
(MapFunction<String, Tuple2<String, Organization>>) it -> {
Organization entity = OBJECT_MAPPER.readValue(it, Organization.class);
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
Dataset<Tuple2<String, String>> relations = spark
.createDataset(
spark
.read()
.load(mergeRelsPath)
.as(Encoders.bean(Relation.class))
.where("relClass == 'merges'")
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
.groupByKey()
.flatMap(g -> {
List<Tuple2<String, String>> rels = new ArrayList<>();
for (String id1 : g._2()) {
for (String id2 : g._2()) {
if (!id1.equals(id2))
if (id1.contains("openorgs____") && !id2.contains("openorgsmesh"))
rels.add(new Tuple2<>(id1, id2));
}
}
return rels.iterator();
})
.rdd(),
Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
Dataset<Tuple2<String, OrgSimRel>> relations2 = relations // <openorgs, corda>
.joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner")
.map(
(MapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, Organization>>, OrgSimRel>) r -> new OrgSimRel(
r._1()._1(),
r._2()._2().getOriginalId().get(0),
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
r._2()._2().getCollectedfrom().get(0).getValue(),
"group::" + r._1()._1()),
Encoders.bean(OrgSimRel.class))
.map(
(MapFunction<OrgSimRel, Tuple2<String, OrgSimRel>>) o -> new Tuple2<>(o.getLocal_id(), o),
Encoders.tuple(Encoders.STRING(), Encoders.bean(OrgSimRel.class)));
return relations2
.joinWith(entities, relations2.col("_1").equalTo(entities.col("_1")), "inner")
.map(
(MapFunction<Tuple2<Tuple2<String, OrgSimRel>, Tuple2<String, Organization>>, OrgSimRel>) r -> {
OrgSimRel orgSimRel = r._1()._2();
orgSimRel.setLocal_id(r._2()._2().getOriginalId().get(0));
return orgSimRel;
},
Encoders.bean(OrgSimRel.class));
}
}

@ -28,8 +28,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
SOURCE, TARGET
}
public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark)
throws Exception {
public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) throws Exception {
super(parser, spark);
}

@ -12,6 +12,7 @@ import org.codehaus.jackson.annotate.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.pace.util.PaceException;
public class ConnectedComponent implements Serializable {
@ -36,7 +37,7 @@ public class ConnectedComponent implements Serializable {
if (docIds.size() > 1) {
final String s = getMin();
String prefix = s.split("\\|")[0];
ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s);
ccId = prefix + "|dedup_wf_001::" + DHPUtils.md5(s);
return ccId;
} else {
return docIds.iterator().next();

@ -0,0 +1,138 @@
package eu.dnetlib.dhp.oa.dedup.model;
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.oa.dedup.DatePicker;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.PidComparator;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
public class Identifier<T extends OafEntity> implements Serializable, Comparable<Identifier> {
public static String CROSSREF_ID = "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2";
public static String DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254";
public static String BASE_DATE = "2000-01-01";
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
private T entity;
public static <T extends OafEntity> Identifier newInstance(T entity) {
return new Identifier(entity);
}
public Identifier(T entity) {
this.entity = entity;
}
public T getEntity() {
return entity;
}
public void setEntity(T entity) {
this.entity = entity;
}
public Date getDate() {
String date = BASE_DATE;
if (ModelSupport.isSubClass(getEntity(), Result.class)) {
Result result = (Result) getEntity();
if (isWellformed(result.getDateofacceptance())) {
date = result.getDateofacceptance().getValue();
}
}
try {
return sdf.parse(date);
} catch (ParseException e) {
return new Date();
}
}
private static boolean isWellformed(Field<String> date) {
return date != null && StringUtils.isNotBlank(date.getValue())
&& date.getValue().matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date.getValue());
}
public List<KeyValue> getCollectedFrom() {
return entity.getCollectedfrom();
}
public EntityType getEntityType() {
return EntityType.fromClass(entity.getClass());
}
public String getOriginalID() {
return entity.getId();
}
private PidType getPidType() {
return PidType.tryValueOf(StringUtils.substringBefore(StringUtils.substringAfter(entity.getId(), "|"), "_"));
}
@Override
public int compareTo(Identifier i) {
// priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4)
// alphabetical order of the originalID
Set<String> lKeys = Optional
.ofNullable(getCollectedFrom())
.map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet()))
.orElse(Sets.newHashSet());
final Optional<List<KeyValue>> cf = Optional.ofNullable(i.getCollectedFrom());
Set<String> rKeys = cf
.map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet()))
.orElse(Sets.newHashSet());
if (this.getPidType().compareTo(i.getPidType()) == 0) { // same type
if (getEntityType() == EntityType.publication) {
if (isFromDatasourceID(lKeys, CROSSREF_ID)
&& !isFromDatasourceID(rKeys, CROSSREF_ID))
return -1;
if (isFromDatasourceID(rKeys, CROSSREF_ID)
&& !isFromDatasourceID(lKeys, CROSSREF_ID))
return 1;
}
if (getEntityType() == EntityType.dataset) {
if (isFromDatasourceID(lKeys, DATACITE_ID)
&& !isFromDatasourceID(rKeys, DATACITE_ID))
return -1;
if (isFromDatasourceID(rKeys, DATACITE_ID)
&& !isFromDatasourceID(lKeys, DATACITE_ID))
return 1;
}
if (this.getDate().compareTo(i.getDate()) == 0) {// same date
// the minus because we need to take the alphabetically lower id
return this.getOriginalID().compareTo(i.getOriginalID());
} else
// the minus is because we need to take the elder date
return this.getDate().compareTo(i.getDate());
} else {
return new PidComparator<>(getEntity()).compare(toSP(getPidType()), toSP(i.getPidType()));
}
}
private StructuredProperty toSP(PidType pidType) {
return OafMapperUtils.structuredProperty("", pidType.toString(), pidType.toString(), "", "", new DataInfo());
}
public boolean isFromDatasourceID(Set<String> collectedFrom, String dsId) {
return collectedFrom.contains(dsId);
}
}

@ -0,0 +1,108 @@
package eu.dnetlib.dhp.oa.dedup.model;
import java.io.Serializable;
public class OrgSimRel implements Serializable {
String local_id;
String oa_original_id;
String oa_name;
String oa_acronym;
String oa_country;
String oa_url;
String oa_collectedfrom;
String group_id;
public OrgSimRel() {
}
public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country,
String oa_url, String oa_collectedfrom, String group_id) {
this.local_id = local_id;
this.oa_original_id = oa_original_id;
this.oa_name = oa_name;
this.oa_acronym = oa_acronym;
this.oa_country = oa_country;
this.oa_url = oa_url;
this.oa_collectedfrom = oa_collectedfrom;
this.group_id = group_id;
}
public String getLocal_id() {
return local_id;
}
public void setLocal_id(String local_id) {
this.local_id = local_id;
}
public String getOa_original_id() {
return oa_original_id;
}
public void setOa_original_id(String oa_original_id) {
this.oa_original_id = oa_original_id;
}
public String getOa_name() {
return oa_name;
}
public void setOa_name(String oa_name) {
this.oa_name = oa_name;
}
public String getOa_acronym() {
return oa_acronym;
}
public void setOa_acronym(String oa_acronym) {
this.oa_acronym = oa_acronym;
}
public String getOa_country() {
return oa_country;
}
public void setOa_country(String oa_country) {
this.oa_country = oa_country;
}
public String getOa_url() {
return oa_url;
}
public void setOa_url(String oa_url) {
this.oa_url = oa_url;
}
public String getOa_collectedfrom() {
return oa_collectedfrom;
}
public void setOa_collectedfrom(String oa_collectedfrom) {
this.oa_collectedfrom = oa_collectedfrom;
}
public String getGroup_id() {
return group_id;
}
public void setGroup_id(String group_id) {
this.group_id = group_id;
}
@Override
public String toString() {
return "OrgSimRel{" +
"local_id='" + local_id + '\'' +
", oa_original_id='" + oa_original_id + '\'' +
", oa_name='" + oa_name + '\'' +
", oa_acronym='" + oa_acronym + '\'' +
", oa_country='" + oa_country + '\'' +
", oa_url='" + oa_url + '\'' +
", oa_collectedfrom='" + oa_collectedfrom + '\'' +
'}';
}
}

@ -0,0 +1,44 @@
[
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "address for the LookUp",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "action set identifier (name of the orchestrator)",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "path of the working directory",
"paramRequired": true
},
{
"paramName": "np",
"paramLongName": "numPartitions",
"paramDescription": "number of partitions for the similarity relations intermediate phases",
"paramRequired": false
},
{
"paramName": "purl",
"paramLongName": "postgresUrl",
"paramDescription": "the url of the postgres server",
"paramRequired": true
},
{
"paramName": "pusr",
"paramLongName": "postgresUser",
"paramDescription": "the owner of the postgres database",
"paramRequired": true
},
{
"paramName": "ppwd",
"paramLongName": "postgresPassword",
"paramDescription": "the password for the postgres user",
"paramRequired": true
}
]

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

@ -0,0 +1,208 @@
<workflow-app name="Organization Dedup" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>the raw graph base path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
</property>
<property>
<name>dedupGraphPath</name>
<description>path for the output graph</description>
</property>
<property>
<name>cutConnectedComponent</name>
<description>max number of elements in a connected component</description>
</property>
<property>
<name>dbUrl</name>
<description>the url of the database</description>
</property>
<property>
<name>dbUser</name>
<description>the user of the database</description>
</property>
<property>
<name>dbTable</name>
<description>the name of the table in the database</description>
</property>
<property>
<name>dbPwd</name>
<description>the passowrd of the user of the database</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="resetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="resetWorkingPath">
<fs>
<delete path="${workingPath}"/>
</fs>
<ok to="copyRelations"/>
<error to="Kill"/>
</action>
<action name="copyRelations">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>-pb</arg>
<arg>${graphBasePath}/relation</arg>
<arg>${workingPath}/${actionSetId}/organization_simrel</arg>
</distcp>
<ok to="CreateSimRel"/>
<error to="Kill"/>
</action>
<action name="CreateSimRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Similarity Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CreateMergeRel"/>
<error to="Kill"/>
</action>
<action name="CreateMergeRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Merge Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--cutConnectedComponent</arg><arg>${cutConnectedComponent}</arg>
</spark>
<ok to="PrepareNewOrgs"/>
<error to="Kill"/>
</action>
<action name="PrepareNewOrgs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare New Organizations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbTable</arg><arg>${dbTable}</arg>
<arg>--dbUser</arg><arg>${dbUser}</arg>
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
<arg>--numConnections</arg><arg>20</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

@ -0,0 +1,240 @@
<workflow-app name="Organization Dedup" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>the raw graph base path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
</property>
<property>
<name>dedupGraphPath</name>
<description>path for the output graph</description>
</property>
<property>
<name>cutConnectedComponent</name>
<description>max number of elements in a connected component</description>
</property>
<property>
<name>dbUrl</name>
<description>the url of the database</description>
</property>
<property>
<name>dbUser</name>
<description>the user of the database</description>
</property>
<property>
<name>dbTable</name>
<description>the name of the table in the database</description>
</property>
<property>
<name>dbPwd</name>
<description>the passowrd of the user of the database</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="resetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="resetWorkingPath">
<fs>
<delete path="${workingPath}"/>
</fs>
<ok to="copyRelations"/>
<error to="Kill"/>
</action>
<action name="copyRelations">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>-pb</arg>
<arg>/tmp/graph_openorgs_and_corda/relation</arg>
<arg>${workingPath}/${actionSetId}/organization_simrel</arg>
</distcp>
<ok to="CreateSimRel"/>
<error to="Kill"/>
</action>
<action name="CreateSimRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Similarity Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CreateMergeRel"/>
<error to="Kill"/>
</action>
<action name="CreateMergeRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Merge Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--cutConnectedComponent</arg><arg>${cutConnectedComponent}</arg>
</spark>
<ok to="PrepareOrgRels"/>
<error to="Kill"/>
</action>
<action name="PrepareOrgRels">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare Organization Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkPrepareOrgRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbTable</arg><arg>${dbTable}</arg>
<arg>--dbUser</arg><arg>${dbUser}</arg>
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
<arg>--numConnections</arg><arg>20</arg>
</spark>
<ok to="PrepareNewOrgs"/>
<error to="Kill"/>
</action>
<action name="PrepareNewOrgs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare New Organizations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--apiUrl</arg><arg>${apiUrl}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbTable</arg><arg>${dbTable}</arg>
<arg>--dbUser</arg><arg>${dbUser}</arg>
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
<arg>--numConnections</arg><arg>20</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -0,0 +1,62 @@
[
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "the working directory path",
"paramRequired": true
},
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "the url of the lookup service",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "the id of the actionset (orchestrator)",
"paramRequired": true
},
{
"paramName": "nc",
"paramLongName": "numConnections",
"paramDescription": "number of connections to the postgres db (for the write operation)",
"paramRequired": false
},
{
"paramName": "au",
"paramLongName": "apiUrl",
"paramDescription": "the url for the APIs of the openorgs service",
"paramRequired": false
},
{
"paramName": "du",
"paramLongName": "dbUrl",
"paramDescription": "the url of the database",
"paramRequired": true
},
{
"paramName": "dusr",
"paramLongName": "dbUser",
"paramDescription": "the user of the database",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "dbTable",
"paramDescription": "the name of the table in the database",
"paramRequired": true
},
{
"paramName": "dpwd",
"paramLongName": "dbPwd",
"paramDescription": "the password for the user of the database",
"paramRequired": true
}
]

@ -0,0 +1,56 @@
[
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "the working directory path",
"paramRequired": true
},
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "the url of the lookup service",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "the id of the actionset (orchestrator)",
"paramRequired": true
},
{
"paramName": "nc",
"paramLongName": "numConnections",
"paramDescription": "number of connections to the postgres db (for the write operation)",
"paramRequired": false
},
{
"paramName": "du",
"paramLongName": "dbUrl",
"paramDescription": "the url of the database",
"paramRequired": true
},
{
"paramName": "dusr",
"paramLongName": "dbUser",
"paramDescription": "the user of the database",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "dbTable",
"paramDescription": "the name of the table in the database",
"paramRequired": true
},
{
"paramName": "dpwd",
"paramLongName": "dbPwd",
"paramDescription": "the password for the user of the database",
"paramRequired": true
}
]

@ -9,6 +9,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
@ -21,13 +22,16 @@ import scala.Tuple2;
public class EntityMergerTest implements Serializable {
List<Tuple2<String, Publication>> publications;
List<Tuple2<String, Publication>> publications2;
private List<Tuple2<String, Publication>> publications;
private List<Tuple2<String, Publication>> publications2;
private List<Tuple2<String, Publication>> publications3;
private List<Tuple2<String, Publication>> publications4;
private List<Tuple2<String, Publication>> publications5;
String testEntityBasePath;
DataInfo dataInfo;
String dedupId = "dedup_id";
Publication pub_top;
private String testEntityBasePath;
private DataInfo dataInfo;
private String dedupId = "00|dedup_id::1";
private Publication pub_top;
@BeforeEach
public void setUp() throws Exception {
@ -39,6 +43,9 @@ public class EntityMergerTest implements Serializable {
publications = readSample(testEntityBasePath + "/publication_merge.json", Publication.class);
publications2 = readSample(testEntityBasePath + "/publication_merge2.json", Publication.class);
publications3 = readSample(testEntityBasePath + "/publication_merge3.json", Publication.class);
publications4 = readSample(testEntityBasePath + "/publication_merge4.json", Publication.class);
publications5 = readSample(testEntityBasePath + "/publication_merge5.json", Publication.class);
pub_top = getTopPub(publications);
@ -48,13 +55,17 @@ public class EntityMergerTest implements Serializable {
@Test
public void softwareMergerTest() throws InstantiationException, IllegalAccessException {
List<Tuple2<String, Software>> softwares = readSample(
testEntityBasePath + "/software_merge.json", Software.class);
Software merged = DedupRecordFactory
.entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class);
assertEquals(merged.getBestaccessright().getClassid(), "OPEN SOURCE");
assertEquals("OPEN SOURCE", merged.getBestaccessright().getClassid());
assertEquals("50|doi_dedup___::0968af610a356656706657e4f234b340", merged.getId());
}
@Test
@ -63,45 +74,46 @@ public class EntityMergerTest implements Serializable {
Publication pub_merged = DedupRecordFactory
.entityMerger(dedupId, publications.iterator(), 0, dataInfo, Publication.class);
assertEquals(dedupId, pub_merged.getId());
assertEquals(pub_merged.getJournal(), pub_top.getJournal());
assertEquals(pub_merged.getBestaccessright().getClassid(), "OPEN");
assertEquals(pub_merged.getResulttype(), pub_top.getResulttype());
assertEquals(pub_merged.getLanguage(), pub_merged.getLanguage());
assertEquals(pub_merged.getPublisher(), pub_top.getPublisher());
assertEquals(pub_merged.getEmbargoenddate(), pub_top.getEmbargoenddate());
assertEquals(pub_merged.getResourcetype().getClassid(), "0004");
assertEquals(pub_merged.getDateoftransformation(), pub_top.getDateoftransformation());
assertEquals(pub_merged.getOaiprovenance(), pub_top.getOaiprovenance());
assertEquals(pub_merged.getDateofcollection(), pub_top.getDateofcollection());
assertEquals(pub_merged.getInstance().size(), 3);
assertEquals(pub_merged.getCountry().size(), 2);
assertEquals(pub_merged.getSubject().size(), 0);
assertEquals(pub_merged.getTitle().size(), 2);
assertEquals(pub_merged.getRelevantdate().size(), 0);
assertEquals(pub_merged.getDescription().size(), 0);
assertEquals(pub_merged.getSource().size(), 0);
assertEquals(pub_merged.getFulltext().size(), 0);
assertEquals(pub_merged.getFormat().size(), 0);
assertEquals(pub_merged.getContributor().size(), 0);
assertEquals(pub_merged.getCoverage().size(), 0);
assertEquals(pub_merged.getContext().size(), 0);
assertEquals(pub_merged.getExternalReference().size(), 0);
assertEquals(pub_merged.getOriginalId().size(), 3);
assertEquals(pub_merged.getCollectedfrom().size(), 3);
assertEquals(pub_merged.getPid().size(), 1);
assertEquals(pub_merged.getExtraInfo().size(), 0);
// verify id
assertEquals("50|doi_dedup___::0968af610a356656706657e4f234b340", pub_merged.getId());
assertEquals(pub_top.getJournal(), pub_merged.getJournal());
assertEquals("OPEN", pub_merged.getBestaccessright().getClassid());
assertEquals(pub_top.getResulttype(), pub_merged.getResulttype());
assertEquals(pub_top.getLanguage(), pub_merged.getLanguage());
assertEquals(pub_top.getPublisher(), pub_merged.getPublisher());
assertEquals(pub_top.getEmbargoenddate(), pub_merged.getEmbargoenddate());
assertEquals(pub_top.getResourcetype().getClassid(), "");
assertEquals(pub_top.getDateoftransformation(), pub_merged.getDateoftransformation());
assertEquals(pub_top.getOaiprovenance(), pub_merged.getOaiprovenance());
assertEquals(pub_top.getDateofcollection(), pub_merged.getDateofcollection());
assertEquals(3, pub_merged.getInstance().size());
assertEquals(2, pub_merged.getCountry().size());
assertEquals(0, pub_merged.getSubject().size());
assertEquals(2, pub_merged.getTitle().size());
assertEquals(0, pub_merged.getRelevantdate().size());
assertEquals(0, pub_merged.getDescription().size());
assertEquals(0, pub_merged.getSource().size());
assertEquals(0, pub_merged.getFulltext().size());
assertEquals(0, pub_merged.getFormat().size());
assertEquals(0, pub_merged.getContributor().size());
assertEquals(0, pub_merged.getCoverage().size());
assertEquals(0, pub_merged.getContext().size());
assertEquals(0, pub_merged.getExternalReference().size());
assertEquals(3, pub_merged.getOriginalId().size());
assertEquals(3, pub_merged.getCollectedfrom().size());
assertEquals(1, pub_merged.getPid().size());
assertEquals(0, pub_merged.getExtraInfo().size());
// verify datainfo
assertEquals(pub_merged.getDataInfo(), dataInfo);
assertEquals(dataInfo, pub_merged.getDataInfo());
// verify datepicker
assertEquals(pub_merged.getDateofacceptance().getValue(), "2018-09-30");
assertEquals("2018-09-30", pub_merged.getDateofacceptance().getValue());
// verify authors
assertEquals(pub_merged.getAuthor().size(), 9);
assertEquals(AuthorMerger.countAuthorsPids(pub_merged.getAuthor()), 4);
assertEquals(9, pub_merged.getAuthor().size());
assertEquals(4, AuthorMerger.countAuthorsPids(pub_merged.getAuthor()));
// verify title
int count = 0;
@ -109,7 +121,7 @@ public class EntityMergerTest implements Serializable {
if (title.getQualifier().getClassid().equals("main title"))
count++;
}
assertEquals(count, 1);
assertEquals(1, count);
}
@Test
@ -118,9 +130,47 @@ public class EntityMergerTest implements Serializable {
Publication pub_merged = DedupRecordFactory
.entityMerger(dedupId, publications2.iterator(), 0, dataInfo, Publication.class);
assertEquals(pub_merged.getAuthor().size(), 27);
// insert assertions here
// verify id
assertEquals("50|doi_dedup___::0ca46ff10b2b4c756191719d85302b14", pub_merged.getId());
assertEquals(27, pub_merged.getAuthor().size());
}
@Test
public void publicationMergerTest3() throws InstantiationException, IllegalAccessException {
Publication pub_merged = DedupRecordFactory
.entityMerger(dedupId, publications3.iterator(), 0, dataInfo, Publication.class);
// verify id
assertEquals("50|doi_dedup___::0ca46ff10b2b4c756191719d85302b14", pub_merged.getId());
}
@Test
public void publicationMergerTest4() throws InstantiationException, IllegalStateException, IllegalAccessException {
Publication pub_merged = DedupRecordFactory
.entityMerger(dedupId, publications4.iterator(), 0, dataInfo, Publication.class);
// verify id
assertEquals("50|dedup_wf_001::0ca46ff10b2b4c756191719d85302b14", pub_merged.getId());
}
@Test
public void publicationMergerTest5() throws InstantiationException, IllegalStateException, IllegalAccessException {
System.out
.println(
publications5
.stream()
.map(p -> p._2().getId())
.collect(Collectors.toList()));
Publication pub_merged = DedupRecordFactory
.entityMerger(dedupId, publications5.iterator(), 0, dataInfo, Publication.class);
// verify id
assertEquals("50|dedup_wf_001::0ca46ff10b2b4c756191719d85302b14", pub_merged.getId());
}
public DataInfo setDI() {

@ -0,0 +1,117 @@
package eu.dnetlib.dhp.oa.dedup;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class IdGeneratorTest {
private static ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static List<Identifier<Publication>> bestIds;
private static List<Identifier<Publication>> bestIds2;
private static List<Identifier<Publication>> bestIds3;
private static String testEntityBasePath;
@BeforeAll
public static void setUp() throws Exception {
testEntityBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/json").toURI())
.toFile()
.getAbsolutePath();
bestIds = createBestIds(testEntityBasePath + "/publication_idgeneration.json", Publication.class);
bestIds2 = createBestIds(testEntityBasePath + "/publication_idgeneration2.json", Publication.class);
bestIds3 = createBestIds(testEntityBasePath + "/publication_idgeneration3.json", Publication.class);
}
@Test
public void generateIdTest1() {
String id1 = IdGenerator.generate(bestIds, "50|defaultID");
System.out
.println("id list 1 = " + bestIds.stream().map(i -> i.getOriginalID()).collect(Collectors.toList()));
assertEquals("50|doi_dedup___::0968af610a356656706657e4f234b340", id1);
}
@Test
public void generateIdTest2() {
String id1 = IdGenerator.generate(bestIds2, "50|defaultID");
String id2 = IdGenerator.generate(bestIds3, "50|defaultID");
System.out
.println("id list 2 = " + bestIds2.stream().map(i -> i.getOriginalID()).collect(Collectors.toList()));
System.out.println("winner 2 = " + id1);
System.out
.println("id list 3 = " + bestIds3.stream().map(i -> i.getOriginalID()).collect(Collectors.toList()));
System.out.println("winner 3 = " + id2);
assertEquals("50|doi_dedup___::1a77a3bba737f8b669dcf330ad3b37e2", id1);
assertEquals("50|dedup_wf_001::0829b5191605bdbea36d6502b8c1ce1g", id2);
}
protected static <T extends OafEntity> List<Identifier<T>> createBestIds(String path, Class<T> clazz) {
final Stream<Identifier<T>> ids = readSample(path, clazz)
.stream()
.map(Tuple2::_2)
.map(Identifier::newInstance);
return ids.collect(Collectors.toList());
}
public static <T> List<Tuple2<String, T>> readSample(String path, Class<T> clazz) {
List<Tuple2<String, T>> res = new ArrayList<>();
BufferedReader reader;
try {
reader = new BufferedReader(new FileReader(path));
String line = reader.readLine();
while (line != null) {
res
.add(
new Tuple2<>(
MapDocumentUtil.getJPathString("$.id", line),
OBJECT_MAPPER.readValue(line, clazz)));
// read next line
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
return res;
}
public static StructuredProperty pid(String pid, String classid, String classname) {
return OafMapperUtils.structuredProperty(pid, classid, classname, "", "", new DataInfo());
}
public static List<KeyValue> keyValue(String key, String value) {
return Lists.newArrayList(OafMapperUtils.keyValue(key, value));
}
}

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
@ -13,9 +12,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@ -59,6 +55,7 @@ public class SparkDedupTest implements Serializable {
private static String testOutputBasePath;
private static String testDedupGraphBasePath;
private static final String testActionSetId = "test-orchestrator";
private static String testDedupAssertionsBasePath;
@BeforeAll
public static void cleanUp() throws IOException, URISyntaxException {
@ -73,6 +70,10 @@ public class SparkDedupTest implements Serializable {
testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
testDedupAssertionsBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/assertions").toURI())
.toFile()
.getAbsolutePath();
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
@ -87,6 +88,7 @@ public class SparkDedupTest implements Serializable {
.getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
@BeforeEach
@ -157,6 +159,7 @@ public class SparkDedupTest implements Serializable {
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
parser
.parseArgument(
new String[] {
@ -171,27 +174,27 @@ public class SparkDedupTest implements Serializable {
long orgs_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count();
long pubs_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.count();
long sw_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.count();
long ds_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.count();
long orp_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.count();
assertEquals(3082, orgs_simrel);
@ -203,6 +206,67 @@ public class SparkDedupTest implements Serializable {
@Test
@Order(2)
public void collectSimRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCollectSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
parser
.parseArgument(
new String[] {
"-asi", testActionSetId,
"-la", "lookupurl",
"-w", testOutputBasePath,
"-np", "50",
"-purl", "jdbc:postgresql://localhost:5432/dnet_dedup",
"-pusr", "postgres_user",
"-ppwd", ""
});
new SparkCollectSimRels(
parser,
spark,
spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"),
spark.read().load(testDedupAssertionsBasePath + "/groups"))
.run(isLookUpService);
long orgs_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count();
long pubs_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.count();
long sw_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.count();
long ds_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.count();
long orp_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.count();
assertEquals(3672, orgs_simrel);
assertEquals(10459, pubs_simrel);
assertEquals(3767, sw_simrel);
assertEquals(3865, ds_simrel);
assertEquals(10173, orp_simrel);
}
@Test
@Order(3)
public void cutMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -211,6 +275,7 @@ public class SparkDedupTest implements Serializable {
SparkCreateMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser
.parseArgument(
new String[] {
@ -297,7 +362,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(3)
@Order(4)
public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -306,6 +371,7 @@ public class SparkDedupTest implements Serializable {
SparkCreateMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser
.parseArgument(
new String[] {
@ -351,7 +417,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(4)
@Order(5)
public void createDedupRecordTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -390,7 +456,7 @@ public class SparkDedupTest implements Serializable {
testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord")
.count();
assertEquals(85, orgs_deduprecord);
assertEquals(84, orgs_deduprecord);
assertEquals(65, pubs_deduprecord);
assertEquals(51, sw_deduprecord);
assertEquals(97, ds_deduprecord);
@ -398,7 +464,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(5)
@Order(6)
public void updateEntityTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -474,7 +540,7 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(896, publications);
assertEquals(838, organizations);
assertEquals(837, organizations);
assertEquals(100, projects);
assertEquals(100, datasource);
assertEquals(200, softwares);
@ -514,7 +580,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(6)
@Order(7)
public void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -564,7 +630,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(7)
@Order(8)
public void testRelations() throws Exception {
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);

@ -0,0 +1,3 @@
{ "id" : "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1g", "pid" : [ { "value" : "pid1", "qualifier" : { "classid" : "original" } } ], "dateofacceptance" : { "value" : "2000-01-01"}, "collectedfrom" : [ { "key" : "key", "value" : "value" } ] }
{ "id" : "50|doi_________::1a77a3bba737f8b669dcf330ad3b37e2", "pid" : [ { "value" : "pid2", "qualifier" : { "classid" : "doi" } } ], "dateofacceptance" : { "value" : "2000-01-01"}, "collectedfrom" : [ { "key" : "key", "value" : "value" } ] }
{ "id" : "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f", "pid" : [ { "value" : "pid3", "qualifier" : { "classid" : "original" } } ], "dateofacceptance" : { "value" : "2000-01-01"}, "collectedfrom" : [ { "key" : "key", "value" : "value" } ] }

@ -0,0 +1,3 @@
{ "id" : "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1g", "pid" : [ { "value" : "pid1", "qualifier" : { "classid" : "original" } } ], "dateofacceptance" : { "value" : "2000-01-01"}, "collectedfrom" : [ { "key" : "key", "value" : "value" } ] }
{ "id" : "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1h", "pid" : [ { "value" : "pid2", "qualifier" : { "classid" : "original" } } ], "dateofacceptance" : { "value" : "2000-01-01"}, "collectedfrom" : [ { "key" : "key", "value" : "value" } ] }
{ "id" : "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1i", "pid" : [ { "value" : "pid3", "qualifier" : { "classid" : "original" } } ], "dateofacceptance" : { "value" : "2000-01-01"}, "collectedfrom" : [ { "key" : "key", "value" : "value" } ] }

@ -3,13 +3,9 @@ package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.BufferedInputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@ -23,10 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*;
import java.util.*;
@ -12,10 +12,13 @@ import org.dom4j.DocumentFactory;
import org.dom4j.DocumentHelper;
import org.dom4j.Node;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.LicenseComparator;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
public abstract class AbstractMdRecordToOafMapper {
@ -133,20 +136,34 @@ public abstract class AbstractMdRecordToOafMapper {
final DataInfo info,
final long lastUpdateTimestamp) {
final List<Oaf> oafs = new ArrayList<>();
final OafEntity entity = createEntity(doc, type, instances, collectedFrom, info, lastUpdateTimestamp);
final String id = IdentifierFactory.createIdentifier(entity);
if (!id.equals(entity.getId())) {
entity.getOriginalId().add(entity.getId());
entity.setId(id);
}
final List<Oaf> oafs = Lists.newArrayList(entity);
if (!oafs.isEmpty()) {
oafs.addAll(addProjectRels(doc, entity));
oafs.addAll(addOtherResultRels(doc, entity));
}
return oafs;
}
private OafEntity createEntity(Document doc, String type, List<Instance> instances, KeyValue collectedFrom,
DataInfo info, long lastUpdateTimestamp) {
switch (type.toLowerCase()) {
case "publication":
final Publication p = new Publication();
populateResultFields(p, doc, instances, collectedFrom, info, lastUpdateTimestamp);
p.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE);
p.setJournal(prepareJournal(doc, info));
oafs.add(p);
break;
return p;
case "dataset":
final Dataset d = new Dataset();
populateResultFields(d, doc, instances, collectedFrom, info, lastUpdateTimestamp);
d.setResulttype(DATASET_DEFAULT_RESULTTYPE);
d.setStoragedate(prepareDatasetStorageDate(doc, info));
d.setDevice(prepareDatasetDevice(doc, info));
d.setSize(prepareDatasetSize(doc, info));
@ -154,48 +171,34 @@ public abstract class AbstractMdRecordToOafMapper {
d.setLastmetadataupdate(prepareDatasetLastMetadataUpdate(doc, info));
d.setMetadataversionnumber(prepareDatasetMetadataVersionNumber(doc, info));
d.setGeolocation(prepareDatasetGeoLocations(doc, info));
oafs.add(d);
break;
return d;
case "software":
final Software s = new Software();
populateResultFields(s, doc, instances, collectedFrom, info, lastUpdateTimestamp);
s.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE);
s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info));
s.setLicense(prepareSoftwareLicenses(doc, info));
s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info));
s.setProgrammingLanguage(prepareSoftwareProgrammingLanguage(doc, info));
oafs.add(s);
break;
return s;
case "":
case "otherresearchproducts":
default:
final OtherResearchProduct o = new OtherResearchProduct();
populateResultFields(o, doc, instances, collectedFrom, info, lastUpdateTimestamp);
o.setResulttype(ORP_DEFAULT_RESULTTYPE);
o.setContactperson(prepareOtherResearchProductContactPersons(doc, info));
o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info));
o.setTool(prepareOtherResearchProductTools(doc, info));
oafs.add(o);
break;
return o;
}
if (!oafs.isEmpty()) {
oafs.addAll(addProjectRels(doc, collectedFrom, info, lastUpdateTimestamp));
oafs.addAll(addOtherResultRels(doc, collectedFrom, info, lastUpdateTimestamp));
}
return oafs;
}
private List<Oaf> addProjectRels(
final Document doc,
final KeyValue collectedFrom,
final DataInfo info,
final long lastUpdateTimestamp) {
final OafEntity entity) {
final List<Oaf> res = new ArrayList<>();
final String docId = createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false);
final String docId = entity.getId();
for (final Object o : doc.selectNodes("//oaf:projectid")) {
@ -207,13 +210,11 @@ public abstract class AbstractMdRecordToOafMapper {
res
.add(
getRelation(
docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, collectedFrom, info,
lastUpdateTimestamp));
docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, entity));
res
.add(
getRelation(
projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, collectedFrom, info,
lastUpdateTimestamp));
projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity));
}
}
@ -225,26 +226,22 @@ public abstract class AbstractMdRecordToOafMapper {
final String relType,
final String subRelType,
final String relClass,
final KeyValue collectedFrom,
final DataInfo info,
final long lastUpdateTimestamp) {
final OafEntity entity) {
final Relation rel = new Relation();
rel.setRelType(relType);
rel.setSubRelType(subRelType);
rel.setRelClass(relClass);
rel.setSource(source);
rel.setTarget(target);
rel.setCollectedfrom(Arrays.asList(collectedFrom));
rel.setDataInfo(info);
rel.setLastupdatetimestamp(lastUpdateTimestamp);
rel.setCollectedfrom(entity.getCollectedfrom());
rel.setDataInfo(entity.getDataInfo());
rel.setLastupdatetimestamp(entity.getLastupdatetimestamp());
return rel;
}
protected abstract List<Oaf> addOtherResultRels(
final Document doc,
final KeyValue collectedFrom,
final DataInfo info,
final long lastUpdateTimestamp);
final OafEntity entity);
private void populateResultFields(
final Result r,
@ -257,7 +254,7 @@ public abstract class AbstractMdRecordToOafMapper {
r.setLastupdatetimestamp(lastUpdateTimestamp);
r.setId(createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false));
r.setOriginalId(Arrays.asList(findOriginalId(doc)));
r.setOriginalId(Lists.newArrayList(findOriginalId(doc)));
r.setCollectedfrom(Arrays.asList(collectedFrom));
r.setPid(prepareResultPids(doc, info));
@ -285,7 +282,7 @@ public abstract class AbstractMdRecordToOafMapper {
r.setExternalReference(new ArrayList<>()); // NOT PRESENT IN MDSTORES
r.setInstance(instances);
r.setBestaccessright(getBestAccessRights(instances));
r.setBestaccessright(OafMapperUtils.createBestAccessRights(instances));
}
protected abstract List<StructuredProperty> prepareResultPids(Document doc, DataInfo info);
@ -370,38 +367,6 @@ public abstract class AbstractMdRecordToOafMapper {
protected abstract Field<String> prepareDatasetStorageDate(Document doc, DataInfo info);
public static Qualifier createBestAccessRights(final List<Instance> instanceList) {
return getBestAccessRights(instanceList);
}
protected static Qualifier getBestAccessRights(final List<Instance> instanceList) {
if (instanceList != null) {
final Optional<Qualifier> min = instanceList
.stream()
.map(i -> i.getAccessright())
.min(new LicenseComparator());
final Qualifier rights = min.isPresent() ? min.get() : new Qualifier();
if (StringUtils.isBlank(rights.getClassid())) {
rights.setClassid(UNKNOWN);
}
if (StringUtils.isBlank(rights.getClassname())
|| UNKNOWN.equalsIgnoreCase(rights.getClassname())) {
rights.setClassname(NOT_AVAILABLE);
}
if (StringUtils.isBlank(rights.getSchemeid())) {
rights.setSchemeid(DNET_ACCESS_MODES);
}
if (StringUtils.isBlank(rights.getSchemename())) {
rights.setSchemename(DNET_ACCESS_MODES);
}
return rights;
}
return null;
}
private Journal prepareJournal(final Document doc, final DataInfo info) {
final Node n = doc.selectSingleNode("//oaf:journal");
if (n != null) {
@ -564,4 +529,5 @@ public abstract class AbstractMdRecordToOafMapper {
}
return res;
}
}

@ -4,11 +4,9 @@ package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
@ -20,6 +18,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,16 +28,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
@ -124,7 +114,18 @@ public class GenerateEntitiesApplication {
private static Oaf merge(final Oaf o1, final Oaf o2) {
if (ModelSupport.isSubClass(o1, OafEntity.class)) {
((OafEntity) o1).mergeFrom((OafEntity) o2);
if (ModelSupport.isSubClass(o1, Result.class)) {
return mergeResults((Result) o1, (Result) o2);
} else if (ModelSupport.isSubClass(o1, Datasource.class)) {
((Datasource) o1).mergeFrom((Datasource) o2);
} else if (ModelSupport.isSubClass(o1, Organization.class)) {
((Organization) o1).mergeFrom((Organization) o2);
} else if (ModelSupport.isSubClass(o1, Project.class)) {
((Project) o1).mergeFrom((Project) o2);
} else {
throw new RuntimeException("invalid OafEntity subtype:" + o1.getClass().getCanonicalName());
}
} else if (ModelSupport.isSubClass(o1, Relation.class)) {
((Relation) o1).mergeFrom((Relation) o2);
} else {
@ -133,6 +134,19 @@ public class GenerateEntitiesApplication {
return o1;
}
protected static Result mergeResults(Result o1, Result o2) {
Result r1 = o1;
Result r2 = o2;
if (new ResultTypeComparator().compare(r1, r2) < 0) {
r1.mergeFrom(r2);
return r1;
} else {
r2.mergeFrom(r1);
return r2;
}
}
private static List<Oaf> convertToListOaf(
final String id,
final String s,

@ -1,16 +1,6 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
@ -19,19 +9,25 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PARTICIPANT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PROVIDED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_RELATED_TO;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORP_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PARTICIPATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROJECT_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVIDES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVISION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RELATIONSHIP;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.asString;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.journal;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty;
import java.io.Closeable;
import java.io.IOException;
@ -445,22 +441,26 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE");
try {
final String targetType = rs.getString(TARGET_TYPE);
if (rs.getString(SOURCE_TYPE).equals("context")) {
final Result r;
if (rs.getString(TARGET_TYPE).equals("dataset")) {
r = new Dataset();
r.setResulttype(DATASET_DEFAULT_RESULTTYPE);
} else if (rs.getString(TARGET_TYPE).equals("software")) {
r = new Software();
r.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE);
} else if (rs.getString(TARGET_TYPE).equals("other")) {
r = new OtherResearchProduct();
r.setResulttype(ORP_DEFAULT_RESULTTYPE);
} else {
r = new Publication();
r.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE);
switch (targetType) {
case "dataset":
r = new Dataset();
break;
case "software":
r = new Software();
break;
case "other":
r = new OtherResearchProduct();
break;
case "publication":
default:
r = new Publication();
break;
}
r.setId(createOpenaireId(50, rs.getString("target_id"), false));
r.setLastupdatetimestamp(lastUpdateTimestamp);
r.setContext(prepareContext(rs.getString("source_id"), info));
@ -470,7 +470,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
return Arrays.asList(r);
} else {
final String sourceId = createOpenaireId(rs.getString(SOURCE_TYPE), rs.getString("source_id"), false);
final String targetId = createOpenaireId(rs.getString(TARGET_TYPE), rs.getString("target_id"), false);
final String targetId = createOpenaireId(targetType, rs.getString("target_id"), false);
final Relation r1 = new Relation();
final Relation r2 = new Relation();

@ -1,10 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty;
import java.util.ArrayList;
import java.util.List;
@ -19,15 +19,8 @@ import com.google.common.collect.Lists;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.GeoLocation;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
public class OafToOafMapper extends AbstractMdRecordToOafMapper {
@ -93,7 +86,13 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected List<Field<String>> prepareDescriptions(final Document doc, final DataInfo info) {
return prepareListFields(doc, "//dc:description", info);
return prepareListFields(doc, "//dc:description", info)
.stream()
.map(d -> {
d.setValue(StringUtils.left(d.getValue(), ModelHardLimits.MAX_ABSTRACT_LENGTH));
return d;
})
.collect(Collectors.toList());
}
@Override
@ -257,11 +256,9 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected List<Oaf> addOtherResultRels(
final Document doc,
final KeyValue collectedFrom,
final DataInfo info,
final long lastUpdateTimestamp) {
final String docId = createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false);
final OafEntity entity) {
final String docId = entity.getId();
final List<Oaf> res = new ArrayList<>();
for (final Object o : doc.selectNodes("//*[local-name()='relatedDataset']")) {
@ -275,13 +272,11 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
res
.add(
getRelation(
docId, otherId, RESULT_RESULT, RELATIONSHIP, IS_RELATED_TO, collectedFrom, info,
lastUpdateTimestamp));
docId, otherId, RESULT_RESULT, RELATIONSHIP, IS_RELATED_TO, entity));
res
.add(
getRelation(
otherId, docId, RESULT_RESULT, RELATIONSHIP, IS_RELATED_TO, collectedFrom, info,
lastUpdateTimestamp));
otherId, docId, RESULT_RESULT, RELATIONSHIP, IS_RELATED_TO, entity));
}
}
return res;
@ -295,6 +290,10 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected List<StructuredProperty> prepareResultPids(final Document doc, final DataInfo info) {
return prepareListStructPropsWithValidQualifier(
doc, "//oaf:identifier", "@identifierType", DNET_PID_TYPES, info);
doc, "//oaf:identifier", "@identifierType", DNET_PID_TYPES, info)
.stream()
.map(CleaningFunctions::normalizePidValue)
.collect(Collectors.toList());
}
}

@ -1,34 +1,26 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.Node;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.GeoLocation;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
@ -198,7 +190,13 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected List<Field<String>> prepareDescriptions(final Document doc, final DataInfo info) {
return prepareListFields(doc, "//datacite:description[@descriptionType='Abstract']", info);
return prepareListFields(doc, "//datacite:description[@descriptionType='Abstract']", info)
.stream()
.map(d -> {
d.setValue(StringUtils.left(d.getValue(), ModelHardLimits.MAX_ABSTRACT_LENGTH));
return d;
})
.collect(Collectors.toList());
}
@Override
@ -314,11 +312,9 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected List<Oaf> addOtherResultRels(
final Document doc,
final KeyValue collectedFrom,
final DataInfo info,
final long lastUpdateTimestamp) {
final OafEntity entity) {
final String docId = createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false);
final String docId = entity.getId();
final List<Oaf> res = new ArrayList<>();
@ -330,30 +326,26 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
final String otherId = createOpenaireId(50, originalId, false);
final String type = ((Node) o).valueOf("@relationType");
if (type.equalsIgnoreCase("IsSupplementTo")) {
if (type.equalsIgnoreCase(IS_SUPPLEMENT_TO)) {
res
.add(
getRelation(
docId, otherId, RESULT_RESULT, SUPPLEMENT, IS_SUPPLEMENT_TO, collectedFrom, info,
lastUpdateTimestamp));
docId, otherId, RESULT_RESULT, SUPPLEMENT, IS_SUPPLEMENT_TO, entity));
res
.add(
getRelation(
otherId, docId, RESULT_RESULT, SUPPLEMENT, IS_SUPPLEMENTED_BY, collectedFrom, info,
lastUpdateTimestamp));
} else if (type.equals("IsPartOf")) {
otherId, docId, RESULT_RESULT, SUPPLEMENT, IS_SUPPLEMENTED_BY, entity));
} else if (type.equalsIgnoreCase(IS_PART_OF)) {
res
.add(
getRelation(
docId, otherId, RESULT_RESULT, PART, IS_PART_OF, collectedFrom, info,
lastUpdateTimestamp));
docId, otherId, RESULT_RESULT, PART, IS_PART_OF, entity));
res
.add(
getRelation(
otherId, docId, RESULT_RESULT, PART, HAS_PARTS, collectedFrom, info,
lastUpdateTimestamp));
otherId, docId, RESULT_RESULT, PART, HAS_PARTS, entity));
} else {
// TODO catch more semantics
}
}
}
@ -384,7 +376,11 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
doc,
"//datacite:alternateIdentifier[@alternateIdentifierType != 'URL' and @alternateIdentifierType != 'landingPage']",
"@alternateIdentifierType", DNET_PID_TYPES, info));
return Lists.newArrayList(res);
return res
.stream()
.map(CleaningFunctions::normalizePidValue)
.collect(Collectors.toList());
}
}

@ -10,6 +10,7 @@ import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class Vocabulary implements Serializable {

@ -7,6 +7,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;

@ -7,8 +7,6 @@ import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;

@ -0,0 +1,99 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ExtendWith(MockitoExtension.class)
public class GenerateEntitiesApplicationTest {
@Mock
private ISLookUpService isLookUpService;
@Mock
private VocabularyGroup vocs;
@BeforeEach
public void setUp() throws IOException, ISLookUpException {
lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());
lenient()
.when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY))
.thenReturn(synonyms());
vocs = VocabularyGroup.loadVocsFromIS(isLookUpService);
}
@Test
public void testMergeResult() throws IOException {
Result publication = getResult("oaf_record.xml", Publication.class);
Result dataset = getResult("odf_dataset.xml", Dataset.class);
Result software = getResult("odf_software.xml", Software.class);
Result orp = getResult("oaf_orp.xml", OtherResearchProduct.class);
verifyMerge(publication, dataset, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(dataset, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(publication, software, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(software, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(publication, orp, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(orp, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(dataset, software, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
verifyMerge(software, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
verifyMerge(dataset, orp, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
verifyMerge(orp, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
verifyMerge(software, orp, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID);
verifyMerge(orp, software, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID);
}
protected <T extends Result> void verifyMerge(Result publication, Result dataset, Class<T> clazz,
String resultType) {
final Result merge = GenerateEntitiesApplication.mergeResults(publication, dataset);
assertTrue(clazz.isAssignableFrom(merge.getClass()));
assertEquals(resultType, merge.getResulttype().getClassid());
}
protected <T extends Result> Result getResult(String xmlFileName, Class<T> clazz) throws IOException {
final String xml = IOUtils.toString(getClass().getResourceAsStream(xmlFileName));
return new OdfToOafMapper(vocs, false)
.processMdRecord(xml)
.stream()
.filter(s -> clazz.isAssignableFrom(s.getClass()))
.map(s -> (Result) s)
.findFirst()
.get();
}
private List<String> vocs() throws IOException {
return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
}
private List<String> synonyms() throws IOException {
return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
}
}

@ -24,14 +24,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ExtendWith(MockitoExtension.class)
@ -71,7 +65,7 @@ public class MappersTest {
assertValidId(p.getId());
assertTrue(p.getOriginalId().size() == 1);
assertTrue(p.getOriginalId().size() == 2);
assertEquals("10.3897/oneeco.2.e13718", p.getOriginalId().get(0));
assertValidId(p.getCollectedfrom().get(0).getKey());
@ -123,22 +117,7 @@ public class MappersTest {
assertNotNull(p.getBestaccessright());
assertEquals("OPEN", p.getBestaccessright().getClassid());
assertValidId(r1.getSource());
assertValidId(r1.getTarget());
assertValidId(r2.getSource());
assertValidId(r2.getTarget());
assertValidId(r1.getCollectedfrom().get(0).getKey());
assertValidId(r2.getCollectedfrom().get(0).getKey());
assertNotNull(r1.getDataInfo());
assertNotNull(r2.getDataInfo());
assertNotNull(r1.getDataInfo().getTrust());
assertNotNull(r2.getDataInfo().getTrust());
assertEquals(r1.getSource(), r2.getTarget());
assertEquals(r2.getSource(), r1.getTarget());
assertTrue(StringUtils.isNotBlank(r1.getRelClass()));
assertTrue(StringUtils.isNotBlank(r2.getRelClass()));
assertTrue(StringUtils.isNotBlank(r1.getRelType()));
assertTrue(StringUtils.isNotBlank(r2.getRelType()));
verifyRelations(p, r1, r2);
// System.out.println(new ObjectMapper().writeValueAsString(p));
// System.out.println(new ObjectMapper().writeValueAsString(r1));
@ -177,7 +156,7 @@ public class MappersTest {
final Relation r2 = (Relation) list.get(2);
assertValidId(d.getId());
assertTrue(d.getOriginalId().size() == 1);
assertTrue(d.getOriginalId().size() == 2);
assertEquals("oai:zenodo.org:3234526", d.getOriginalId().get(0));
assertValidId(d.getCollectedfrom().get(0).getKey());
assertTrue(StringUtils.isNotBlank(d.getTitle().get(0).getValue()));
@ -230,10 +209,19 @@ public class MappersTest {
});
assertEquals("0001", d.getInstance().get(0).getRefereed().getClassid());
verifyRelations(d, r1, r2);
}
private void verifyRelations(OafEntity e, Relation r1, Relation r2) {
assertEquals(e.getId(), r1.getSource());
assertEquals(e.getId(), r2.getTarget());
assertValidId(r1.getSource());
assertValidId(r1.getTarget());
assertValidId(r2.getSource());
assertValidId(r2.getTarget());
assertValidId(r1.getCollectedfrom().get(0).getKey());
assertValidId(r2.getCollectedfrom().get(0).getKey());
assertNotNull(r1.getDataInfo());
assertNotNull(r2.getDataInfo());
assertNotNull(r1.getDataInfo().getTrust());

@ -27,10 +27,10 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;

@ -0,0 +1,83 @@
<?xml version="1.0" encoding="UTF-8"?>
<record xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<header xmlns="http://namespace.openaire.eu/">
<dri:objIdentifier>pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2</dri:objIdentifier>
<dri:recordIdentifier>10.3897/oneeco.2.e13718</dri:recordIdentifier>
<dri:dateOfCollection/>
<dri:mdFormat/>
<dri:mdFormatInterpretation/>
<dri:repositoryId/>
<dr:objectIdentifier/>
<dr:dateOfCollection>2020-03-23T00:20:51.392Z</dr:dateOfCollection>
<dr:dateOfTransformation>2020-03-23T00:26:59.078Z</dr:dateOfTransformation>
<oaf:datasourceprefix>pensoft_____</oaf:datasourceprefix>
</header>
<metadata xmlns="http://namespace.openaire.eu/">
<dc:title>Ecosystem Service capacity is higher in areas of multiple designation types</dc:title>
<dc:creator>Nikolaidou,Charitini</dc:creator>
<dc:creator nameIdentifier="0000-0001-6651-1178" nameIdentifierScheme="ORCID">Votsi,Nefta</dc:creator>
<dc:creator>Sgardelis,Steanos</dc:creator>
<dc:creator>Halley,John</dc:creator>
<dc:creator>Pantis,John</dc:creator>
<dc:creator>Tsiafouli,Maria</dc:creator>
<dc:date>2017</dc:date>
<dc:description>The implementation of the Ecosystem Service (ES) concept into practice might be a challenging task as it has to take into account previous “traditional” policies and approaches that have evaluated nature and biodiversity differently. Among them the Habitat (92/43/EC) and Bird Directives (79/409/EC), the Water Framework Directive (2000/60/EC), and the Noise Directive (2002/49/EC) have led to the evaluation/designation of areas in Europe with different criteria. In this study our goal was to understand how the ES capacity of an area is related to its designation and if areas with multiple designations have higher capacity in providing ES. We selected four catchments in Greece with a great variety of characteristics covering over 25% of the national territory. Inside the catchments we assessed the ES capacity (following the methodology of Burkhard et al. 2009) of areas designated as Natura 2000 sites, Quiet areas and Wetlands or Water bodies and found those areas that have multiple designations. Data were analyzed by GLM to reveal differences regarding the ES capacity among the different types of areas. We also investigated by PCA synergies and trade-offs among different kinds of ES and tested for correlations among landscape properties, such as elevation, aspect and slope and the ES potential. Our results show that areas with different types or multiple designations have a different capacity in providing ES. Areas of one designation type (Protected or Quiet Areas) had in general intermediate scores in most ES but scores were higher compared to areas with no designation, which displayed stronger capacity in provisioning services. Among Protected Areas and Quiet Areas the latter scored better in general. Areas that combined both designation types (Protected and Quiet Areas) showed the highest capacity in 13 out of 29 ES, that were mostly linked with natural and forest ecosystems. We found significant synergies among most regulating, supporting and cultural ES which in turn display trade-offs with provisioning services. The different ES are spatially related and display strong correlation with landscape properties, such as elevation and slope. We suggest that the designation status of an area can be used as an alternative tool for environmental policy, indicating the capacity for ES provision. Multiple designations of areas can be used as proxies for locating ES “hotspots”. This integration of “traditional” evaluation and designation and the “newer” ES concept forms a time- and cost-effective way to be adopted by stakeholders and policy-makers in order to start complying with new standards and demands for nature conservation and environmental management.</dc:description>
<dc:format>text/html</dc:format>
<dc:identifier>https://doi.org/10.3897/oneeco.2.e13718</dc:identifier>
<dc:identifier>https://oneecosystem.pensoft.net/article/13718/</dc:identifier>
<dc:language>eng</dc:language>
<dc:publisher>Pensoft Publishers</dc:publisher>
<dc:relation>info:eu-repo/semantics/altIdentifier/eissn/2367-8194</dc:relation>
<dc:relation>info:eu-repo/grantAgreement/EC/FP7/226852</dc:relation>
<dc:source>One Ecosystem 2: e13718</dc:source>
<dc:source>One Ecosystem 2: e13718</dc:source>
<dc:source>One Ecosystem 2: e13718</dc:source>
<dc:subject>Ecosystem Services hotspots</dc:subject>
<dc:subject>Natura 2000</dc:subject>
<dc:subject>Quiet Protected Areas</dc:subject>
<dc:subject>Biodiversity</dc:subject>
<dc:subject>Agriculture</dc:subject>
<dc:subject>Elevation</dc:subject>
<dc:subject>Slope</dc:subject>
<dc:subject>Ecosystem Service trade-offs and synergies</dc:subject>
<dc:subject> cultural services</dc:subject>
<dc:subject>provisioning services</dc:subject>
<dc:subject>regulating services</dc:subject>
<dc:subject>supporting services</dc:subject>
<dc:type>Research Artefact</dc:type>
<dr:CobjCategory type="other">0020</dr:CobjCategory>
<oaf:dateAccepted>2017-01-01</oaf:dateAccepted>
<oaf:projectid>corda_______::226852</oaf:projectid>
<oaf:accessrights>OPEN</oaf:accessrights>
<oaf:hostedBy id="openaire____::issn226852" name="One Ecosystem"/>
<oaf:collectedFrom
id="openaire____::45e3c7b69bcee6cc5fa945c9e183deb9" name="Pensoft"/>
<oaf:identifier identifierType="doi">10.3897/oneeco.2.e13718</oaf:identifier>
<oaf:fulltext>https://oneecosystem.pensoft.net/article/13718/</oaf:fulltext>
<oaf:journal eissn="2367-8194" issn="">One Ecosystem</oaf:journal>
<oaf:refereed>0001</oaf:refereed>
</metadata>
<about xmlns:oai="http://www.openarchives.org/OAI/2.0/">
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
<originDescription altered="true" harvestDate="2020-03-23T00:20:51.392Z">
<baseURL>http%3A%2F%2Fzookeys.pensoft.net%2Foai.php</baseURL>
<identifier>10.3897/oneeco.2.e13718</identifier>
<datestamp>2017-09-08</datestamp>
<metadataNamespace>http://www.openarchives.org/OAI/2.0/oai_dc/</metadataNamespace>
</originDescription>
</provenance>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.9</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk:repository"
classname="sysimport:crosswalk:repository"
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</about>
</record>

@ -164,7 +164,7 @@ public class CreateRelatedEntitiesJob_phase1 {
if (result.getTitle() != null && !result.getTitle().isEmpty()) {
final StructuredProperty title = result.getTitle().stream().findFirst().get();
title.setValue(StringUtils.left(title.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
title.setValue(StringUtils.left(title.getValue(), ModelHardLimits.MAX_TITLE_LENGTH));
re.setTitle(title);
}
@ -178,7 +178,7 @@ public class CreateRelatedEntitiesJob_phase1 {
.getInstance()
.stream()
.filter(Objects::nonNull)
.limit(ProvisionConstants.MAX_INSTANCES)
.limit(ModelHardLimits.MAX_INSTANCES)
.collect(Collectors.toList()));
}

@ -240,15 +240,15 @@ public class CreateRelatedEntitiesJob_phase2 {
List<ExternalReference> refs = r
.getExternalReference()
.stream()
.limit(ProvisionConstants.MAX_EXTERNAL_ENTITIES)
.limit(ModelHardLimits.MAX_EXTERNAL_ENTITIES)
.collect(Collectors.toList());
r.setExternalReference(refs);
}
if (r.getAuthor() != null) {
List<Author> authors = Lists.newArrayList();
for (Author a : r.getAuthor()) {
a.setFullname(StringUtils.left(a.getFullname(), ProvisionConstants.MAX_AUTHOR_FULLNAME_LENGTH));
if (authors.size() < ProvisionConstants.MAX_AUTHORS || hasORCID(a)) {
a.setFullname(StringUtils.left(a.getFullname(), ModelHardLimits.MAX_AUTHOR_FULLNAME_LENGTH));
if (authors.size() < ModelHardLimits.MAX_AUTHORS || hasORCID(a)) {
authors.add(a);
}
}
@ -260,7 +260,7 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream()
.filter(Objects::nonNull)
.map(d -> {
d.setValue(StringUtils.left(d.getValue(), ProvisionConstants.MAX_ABSTRACT_LENGTH));
d.setValue(StringUtils.left(d.getValue(), ModelHardLimits.MAX_ABSTRACT_LENGTH));
return d;
})
.collect(Collectors.toList());
@ -272,10 +272,10 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream()
.filter(Objects::nonNull)
.map(t -> {
t.setValue(StringUtils.left(t.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
t.setValue(StringUtils.left(t.getValue(), ModelHardLimits.MAX_TITLE_LENGTH));
return t;
})
.limit(ProvisionConstants.MAX_TITLES)
.limit(ModelHardLimits.MAX_TITLES)
.collect(Collectors.toList());
r.setTitle(titles);
}

@ -44,6 +44,7 @@
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setRawGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the RAW graph</DESCRIPTION>
<PARAMETERS>
@ -54,31 +55,45 @@
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setFirstCleanedGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the first CLEANED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">firstCleanedGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/prod_provision/graph/02_graph_first_cleaned</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setDedupGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the DEDUPED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">dedupGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/02_graph_dedup</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/03_graph_dedup</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setInferredGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the INFERRED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">inferredGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/03_graph_inferred</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/04_graph_inferred</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setConsistentGraphPath" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the CONSISTENCY graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">consistentGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/04_graph_consistent</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/05_graph_consistent</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
@ -89,7 +104,7 @@
<DESCRIPTION>Set the target path to store the ORCID enriched graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">orcidGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/05_graph_orcid</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/06_graph_orcid</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
@ -100,7 +115,7 @@
<DESCRIPTION>Set the target path to store the BULK TAGGED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">bulkTaggingGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/06_graph_bulktagging</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/07_graph_bulktagging</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
@ -111,7 +126,7 @@
<DESCRIPTION>Set the target path to store the AFFILIATION from INSTITUTIONAL REPOS graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">affiliationGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/07_graph_affiliation</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/08_graph_affiliation</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
@ -122,7 +137,7 @@
<DESCRIPTION>Set the target path to store the COMMUNITY from SELECTED SOURCES graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">communityOrganizationGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/08_graph_comunity_organization</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/09_graph_comunity_organization</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
@ -133,7 +148,7 @@
<DESCRIPTION>Set the target path to store the FUNDING from SEMANTIC RELATION graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">fundingGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/09_graph_funding</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/10_graph_funding</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
@ -144,7 +159,7 @@
<DESCRIPTION>Set the target path to store the COMMUNITY from SEMANTIC RELATION graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">communitySemRelGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/10_graph_comunity_sem_rel</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/11_graph_comunity_sem_rel</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
@ -155,7 +170,7 @@
<DESCRIPTION>Set the target path to store the COUNTRY enriched graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">countryGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/11_graph_country</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/12_graph_country</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
@ -166,7 +181,7 @@
<DESCRIPTION>Set the target path to store the CLEANED graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">cleanedGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/12_graph_cleaned</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/13_graph_cleaned</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
@ -177,7 +192,7 @@
<DESCRIPTION>Set the target path to store the blacklisted graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">blacklistedGraphPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/13_graph_blacklisted</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/14_graph_blacklisted</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
@ -324,6 +339,31 @@
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="graphCleaningFirst"/>
</ARCS>
</NODE>
<NODE name="graphCleaningFirst" type="SubmitHadoopJob">
<DESCRIPTION>clean the properties in the graph typed as Qualifier according to the vocabulary indicated in schemeid</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'graphInputPath' : 'rawGraphPath',
'graphOutputPath': 'firstCleanedGraphPath',
'isLookupUrl': 'isLookUpUrl'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/clean/oozie_app',
'workingPath' : '/tmp/beta_provision/working_dir/first_clean'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="duplicateScan"/>
</ARCS>
@ -337,7 +377,7 @@
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'actionSetId' : 'dedupConfig',
'graphBasePath' : 'rawGraphPath',
'graphBasePath' : 'firstCleanedGraphPath',
'dedupGraphPath': 'dedupGraphPath',
'isLookUpUrl' : 'isLookUpUrl'
}

Loading…
Cancel
Save