WIP: refactoring model utilities

This commit is contained in:
Claudio Atzori 2023-02-02 17:02:23 +01:00
parent 1845dcfedf
commit 67735f7e9d
53 changed files with 638 additions and 888 deletions

View File

@ -10,6 +10,8 @@ import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
@ -30,9 +32,7 @@ import com.jayway.jsonpath.Option;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
/**
@ -120,7 +120,7 @@ public class GroupEntitiesSparkJob {
private Entity mergeAndGet(Entity b, Entity a) {
if (Objects.nonNull(a) && Objects.nonNull(b)) {
return OafMapperUtils.mergeEntities(b, a);
return MergeUtils.mergeEntities(b, a);
}
return Objects.isNull(a) ? b : a;
}

View File

@ -1,7 +1,16 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.getProvenance;
import com.github.sisyphsu.dateparser.DateParserUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import me.xuender.unidecode.Unidecode;
import org.apache.commons.lang3.StringUtils;
import java.time.LocalDate;
import java.time.ZoneId;
@ -12,19 +21,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import com.github.sisyphsu.dateparser.DateParserUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import me.xuender.unidecode.Unidecode;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.getProvenance;
public class GraphCleaningFunctions extends CleaningFunctions {

View File

@ -17,6 +17,49 @@ import static com.google.common.base.Preconditions.checkArgument;
public class MergeUtils {
public static Oaf merge(final Oaf left, final Oaf right) {
if (ModelSupport.isSubClass(left, Entity.class)) {
return mergeEntities((Entity) left, (Entity) right);
} else if (ModelSupport.isSubClass(left, Relation.class)) {
return MergeUtils.mergeRelation((Relation) left, (Relation) right);
} else {
throw new IllegalArgumentException("invalid Oaf type:" + left.getClass().getCanonicalName());
}
}
public static Entity mergeEntities(Entity original, Entity enrich) {
if (ModelSupport.isSubClass(original, Result.class)) {
return mergeResults((Result) original, (Result) enrich);
} else if (ModelSupport.isSubClass(original, Datasource.class)) {
//TODO
return original;
} else if (ModelSupport.isSubClass(original, Organization.class)) {
return mergeOrganization((Organization) original, (Organization) enrich);
} else if (ModelSupport.isSubClass(original, Project.class)) {
return mergeProject((Project) original, (Project) enrich);
} else {
throw new IllegalArgumentException("invalid Entity subtype:" + original.getClass().getCanonicalName());
}
}
public static Result mergeResults(Result original, Result enrich) {
final boolean leftFromDelegatedAuthority = isFromDelegatedAuthority(original);
final boolean rightFromDelegatedAuthority = isFromDelegatedAuthority(enrich);
if (leftFromDelegatedAuthority && !rightFromDelegatedAuthority) {
return original;
}
if (!leftFromDelegatedAuthority && rightFromDelegatedAuthority) {
return enrich;
}
if (new ResultTypeComparator().compare(original, enrich) < 0) {
return MergeUtils.mergeResult(original, enrich);
} else {
return MergeUtils.mergeResult(enrich, original);
}
}
public static Result mergeResult(Result original, Result enrich) {
final Result mergedResult = (Result) mergeEntity(original, enrich);
@ -191,7 +234,7 @@ public class MergeUtils {
return mergedPublication;
}
public static Oaf mergeOrganization(Organization original, Organization enrich) {
public static Organization mergeOrganization(Organization original, Organization enrich) {
final Organization mergedOrganization = (Organization) mergeEntity(original, enrich);
@ -264,7 +307,7 @@ public class MergeUtils {
return mergedOrganization;
}
public static Oaf mergeOAFProject(Project original, Project enrich) {
public static Project mergeProject(Project original, Project enrich) {
final Project mergedProject = (Project) mergeEntity(original, enrich);
@ -364,7 +407,7 @@ public class MergeUtils {
return mergedProject;
}
private static Entity mergeEntity(Entity original, Entity enrich) {
public static Entity mergeEntity(Entity original, Entity enrich) {
final Entity mergedEntity = original;
@ -531,6 +574,18 @@ public class MergeUtils {
));
}
private static boolean isFromDelegatedAuthority(Result r) {
return Optional
.ofNullable(r.getInstance())
.map(
instance -> instance
.stream()
.filter(i -> Objects.nonNull(i.getCollectedfrom()))
.map(i -> i.getCollectedfrom().getKey())
.anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId)))
.orElse(false);
}
/**
* Valid pid boolean.
*

View File

@ -12,7 +12,6 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.common.AccessRightComparator;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.*;
@ -22,58 +21,6 @@ public class OafMapperUtils {
private OafMapperUtils() {
}
public static Oaf merge(final Oaf left, final Oaf right) {
if (ModelSupport.isSubClass(left, Entity.class)) {
return mergeEntities((Entity) left, (Entity) right);
} else if (ModelSupport.isSubClass(left, Relation.class)) {
return MergeUtils.mergeRelation((Relation) left, (Relation) right);
} else {
throw new IllegalArgumentException("invalid Oaf type:" + left.getClass().getCanonicalName());
}
}
public static Entity mergeEntities(Entity left, Entity right) {
if (ModelSupport.isSubClass(left, Result.class)) {
return mergeResults((Result) left, (Result) right);
} else if (ModelSupport.isSubClass(left, Datasource.class) ||
ModelSupport.isSubClass(left, Organization.class) ||
ModelSupport.isSubClass(left, Project.class)) {
return (Entity) merge(left, right);
} else {
throw new IllegalArgumentException("invalid Entity subtype:" + left.getClass().getCanonicalName());
}
}
public static Result mergeResults(Result left, Result right) {
final boolean leftFromDelegatedAuthority = isFromDelegatedAuthority(left);
final boolean rightFromDelegatedAuthority = isFromDelegatedAuthority(right);
if (leftFromDelegatedAuthority && !rightFromDelegatedAuthority) {
return left;
}
if (!leftFromDelegatedAuthority && rightFromDelegatedAuthority) {
return right;
}
if (new ResultTypeComparator().compare(left, right) < 0) {
return MergeUtils.mergeResult(left, right);
} else {
return MergeUtils.mergeResult(right, left);
}
}
private static boolean isFromDelegatedAuthority(Result r) {
return Optional
.ofNullable(r.getInstance())
.map(
instance -> instance
.stream()
.filter(i -> Objects.nonNull(i.getCollectedfrom()))
.map(i -> i.getCollectedfrom().getKey())
.anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId)))
.orElse(false);
}
public static KeyValue keyValue(final String k, final String v) {
final KeyValue kv = new KeyValue();
kv.setKey(k);
@ -421,20 +368,21 @@ public class OafMapperUtils {
return null;
}
public static KeyValue newKeyValueInstance(String key, String value, DataInfo dataInfo) {
KeyValue kv = new KeyValue();
kv.setKey(key);
kv.setValue(value);
return kv;
}
public static Measure newMeasureInstance(String id, String value, String key, DataInfo dataInfo) {
Measure m = new Measure();
m.setId(id);
m.setUnit(Arrays.asList(newKeyValueInstance(key, value, dataInfo)));
m.setUnit(Arrays.asList(unit(key, value, dataInfo)));
return m;
}
public static MeasureUnit unit(String key, String value, DataInfo dataInfo) {
MeasureUnit unit = new MeasureUnit();
unit.setKey(key);
unit.setValue(value);
unit.setDataInfo(dataInfo);
return unit;
}
public static Relation getRelation(final String source,
final String target,
final String relType,

View File

@ -0,0 +1,97 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class MergeUtilsTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@Test
void testMergePubs() throws IOException {
Publication p1 = read("publication_1.json", Publication.class);
Publication p2 = read("publication_2.json", Publication.class);
Dataset d1 = read("dataset_1.json", Dataset.class);
Dataset d2 = read("dataset_2.json", Dataset.class);
assertEquals(1, p1.getCollectedfrom().size());
assertEquals(ModelConstants.CROSSREF_ID, p1.getCollectedfrom().get(0).getKey());
assertEquals(1, d2.getCollectedfrom().size());
assertFalse(cfId(d2.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
assertEquals(1, p2.getCollectedfrom().size());
assertFalse(cfId(p2.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
assertEquals(1, d1.getCollectedfrom().size());
assertTrue(cfId(d1.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
final Result p1d2 = MergeUtils.mergeResults(p1, d2);
assertEquals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID, p1d2.getResulttype());
assertTrue(p1d2 instanceof Publication);
assertEquals(p1.getId(), p1d2.getId());
}
@Test
void testMergePubs_1() throws IOException {
Publication p2 = read("publication_2.json", Publication.class);
Dataset d1 = read("dataset_1.json", Dataset.class);
final Result p2d1 = MergeUtils.mergeResults(p2, d1);
assertEquals(ModelConstants.DATASET_RESULTTYPE_CLASSID, p2d1.getResulttype());
assertTrue(p2d1 instanceof Dataset);
assertEquals(d1.getId(), p2d1.getId());
assertEquals(2, p2d1.getCollectedfrom().size());
}
@Test
void testMergePubs_2() throws IOException {
Publication p1 = read("publication_1.json", Publication.class);
Publication p2 = read("publication_2.json", Publication.class);
Result p1p2 = MergeUtils.mergeResults(p1, p2);
assertTrue(p1p2 instanceof Publication);
assertEquals(p1.getId(), p1p2.getId());
assertEquals(2, p1p2.getCollectedfrom().size());
}
@Test
void testDelegatedAuthority() throws IOException {
Dataset d1 = read("dataset_2.json", Dataset.class);
Dataset d2 = read("dataset_delegated.json", Dataset.class);
assertEquals(1, d2.getCollectedfrom().size());
assertTrue(cfId(d2.getCollectedfrom()).contains(ModelConstants.ZENODO_OD_ID));
Result res = MergeUtils.mergeResults(d1, d2);
assertEquals(d2, res);
System.out.println(OBJECT_MAPPER.writeValueAsString(res));
}
protected HashSet<String> cfId(List<KeyValue> collectedfrom) {
return collectedfrom.stream().map(KeyValue::getKey).collect(Collectors.toCollection(HashSet::new));
}
protected <T extends Result> T read(String filename, Class<T> clazz) throws IOException {
final String json = IOUtils.toString(getClass().getResourceAsStream(filename));
return OBJECT_MAPPER.readValue(json, clazz);
}
}

View File

@ -152,72 +152,6 @@ class OafMapperUtilsTest {
System.out.println(date);
}
@Test
void testMergePubs() throws IOException {
Publication p1 = read("publication_1.json", Publication.class);
Publication p2 = read("publication_2.json", Publication.class);
Dataset d1 = read("dataset_1.json", Dataset.class);
Dataset d2 = read("dataset_2.json", Dataset.class);
assertEquals(1, p1.getCollectedfrom().size());
assertEquals(ModelConstants.CROSSREF_ID, p1.getCollectedfrom().get(0).getKey());
assertEquals(1, d2.getCollectedfrom().size());
assertFalse(cfId(d2.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
assertEquals(1, p2.getCollectedfrom().size());
assertFalse(cfId(p2.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
assertEquals(1, d1.getCollectedfrom().size());
assertTrue(cfId(d1.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
final Result p1d2 = OafMapperUtils.mergeResults(p1, d2);
assertEquals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID, p1d2.getResulttype());
assertTrue(p1d2 instanceof Publication);
assertEquals(p1.getId(), p1d2.getId());
}
@Test
void testMergePubs_1() throws IOException {
Publication p2 = read("publication_2.json", Publication.class);
Dataset d1 = read("dataset_1.json", Dataset.class);
final Result p2d1 = OafMapperUtils.mergeResults(p2, d1);
assertEquals(ModelConstants.DATASET_RESULTTYPE_CLASSID, p2d1.getResulttype());
assertTrue(p2d1 instanceof Dataset);
assertEquals(d1.getId(), p2d1.getId());
assertEquals(2, p2d1.getCollectedfrom().size());
}
@Test
void testMergePubs_2() throws IOException {
Publication p1 = read("publication_1.json", Publication.class);
Publication p2 = read("publication_2.json", Publication.class);
Result p1p2 = OafMapperUtils.mergeResults(p1, p2);
assertTrue(p1p2 instanceof Publication);
assertEquals(p1.getId(), p1p2.getId());
assertEquals(2, p1p2.getCollectedfrom().size());
}
@Test
void testDelegatedAuthority() throws IOException {
Dataset d1 = read("dataset_2.json", Dataset.class);
Dataset d2 = read("dataset_delegated.json", Dataset.class);
assertEquals(1, d2.getCollectedfrom().size());
assertTrue(cfId(d2.getCollectedfrom()).contains(ModelConstants.ZENODO_OD_ID));
Result res = OafMapperUtils.mergeResults(d1, d2);
assertEquals(d2, res);
System.out.println(OBJECT_MAPPER.writeValueAsString(res));
}
protected HashSet<String> cfId(List<KeyValue> collectedfrom) {
return collectedfrom.stream().map(KeyValue::getKey).collect(Collectors.toCollection(HashSet::new));
}
protected <T extends Result> T read(String filename, Class<T> clazz) throws IOException {
final String json = IOUtils.toString(getClass().getResourceAsStream(filename));
return OBJECT_MAPPER.readValue(json, clazz);

View File

@ -1,14 +1,14 @@
package eu.dnetlib.dhp.actionmanager.promote;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
import static eu.dnetlib.dhp.schema.oaf.common.ModelSupport.isSubClass;
import java.util.function.BiFunction;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
/** OAF model merging support. */
public class MergeAndGet {
@ -47,13 +47,23 @@ public class MergeAndGet {
private static <G extends Oaf, A extends Oaf> G mergeFromAndGet(G x, A y) {
if (isSubClass(x, Relation.class) && isSubClass(y, Relation.class)) {
((Relation) x).mergeFrom((Relation) y);
return x;
} else if (isSubClass(x, OafEntity.class)
&& isSubClass(y, OafEntity.class)
return (G) MergeUtils.mergeRelation((Relation) x, (Relation) y);
} else if (isSubClass(x, Result.class)
&& isSubClass(y, Result.class)
&& isSubClass(x, y)) {
((OafEntity) x).mergeFrom((OafEntity) y);
return x;
return (G) MergeUtils.mergeResult((Result) x, (Result) y);
} else if (isSubClass(x, Datasource.class)
&& isSubClass(y, Datasource.class)
&& isSubClass(x, y)) {
throw new RuntimeException("MERGE_FROM_AND_GET should not deal with Datasource types");
} else if (isSubClass(x, Organization.class)
&& isSubClass(y, Organization.class)
&& isSubClass(x, y)) {
return (G) MergeUtils.mergeOrganization((Organization) x, (Organization) y);
} else if (isSubClass(x, Project.class)
&& isSubClass(y, Project.class)
&& isSubClass(x, y)) {
return (G) MergeUtils.mergeProject((Project) x, (Project) y);
}
throw new RuntimeException(
String
@ -64,20 +74,26 @@ public class MergeAndGet {
@SuppressWarnings("unchecked")
private static <G extends Oaf, A extends Oaf> G selectNewerAndGet(G x, A y) {
if (x.getClass().equals(y.getClass())
&& x.getLastupdatetimestamp() > y.getLastupdatetimestamp()) {
return x;
} else if (x.getClass().equals(y.getClass())
&& x.getLastupdatetimestamp() < y.getLastupdatetimestamp()) {
return (G) y;
} else if (isSubClass(x, y) && x.getLastupdatetimestamp() > y.getLastupdatetimestamp()) {
return x;
} else if (isSubClass(x, y) && x.getLastupdatetimestamp() < y.getLastupdatetimestamp()) {
throw new RuntimeException(
String
.format(
"SELECT_NEWER_AND_GET cannot return right type when it is not the same as left type: %s, %s",
x.getClass().getCanonicalName(), y.getClass().getCanonicalName()));
if (isSubClass(x, Entity.class) && isSubClass(x, Entity.class)) {
Entity xE = (Entity) x;
Entity yE = (Entity) y;
if (xE.getClass().equals(yE.getClass())
&& xE.getLastupdatetimestamp() > yE.getLastupdatetimestamp()) {
return x;
} else if (xE.getClass().equals(yE.getClass())
&& xE.getLastupdatetimestamp() < yE.getLastupdatetimestamp()) {
return (G) y;
} else if (isSubClass(xE, yE) && xE.getLastupdatetimestamp() > yE.getLastupdatetimestamp()) {
return x;
} else if (isSubClass(xE, yE) && xE.getLastupdatetimestamp() < yE.getLastupdatetimestamp()) {
throw new RuntimeException(
String
.format(
"SELECT_NEWER_AND_GET cannot return right type when it is not the same as left type: %s, %s",
x.getClass().getCanonicalName(), y.getClass().getCanonicalName()));
}
}
throw new RuntimeException(
String

View File

@ -2,13 +2,13 @@
package eu.dnetlib.dhp.actionmanager.promote;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
import java.io.IOException;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
@ -26,7 +26,7 @@ import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
/** Applies a given action payload file to graph table of compatible type. */
@ -104,7 +104,7 @@ public class PromoteActionPayloadForGraphTableJob {
private static void throwIfGraphTableClassIsNotSubClassOfActionPayloadClass(
Class<? extends Oaf> rowClazz, Class<? extends Oaf> actionPayloadClazz) {
if (!isSubClass(rowClazz, actionPayloadClazz)) {
if (!ModelSupport.isSubClass(rowClazz, actionPayloadClazz)) {
String msg = String
.format(
"graph table class is not a subclass of action payload class: graph=%s, action=%s",
@ -242,11 +242,11 @@ public class PromoteActionPayloadForGraphTableJob {
private static <T extends Oaf> Function<T, Boolean> isNotZeroFnUsingIdOrSourceAndTarget() {
return t -> {
if (isSubClass(t, Relation.class)) {
if (ModelSupport.isSubClass(t, Relation.class)) {
final Relation rel = (Relation) t;
return StringUtils.isNotBlank(rel.getSource()) && StringUtils.isNotBlank(rel.getTarget());
}
return StringUtils.isNotBlank(((OafEntity) t).getId());
return StringUtils.isNotBlank(((Entity) t).getId());
};
}

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.actionmanager.promote;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
import static eu.dnetlib.dhp.schema.oaf.common.ModelSupport.isSubClass;
import java.util.Objects;
import java.util.Optional;

View File

@ -8,6 +8,7 @@ import static org.mockito.Mockito.*;
import java.util.function.BiFunction;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
@ -49,7 +50,7 @@ public class MergeAndGetTest {
void shouldThrowForOafAndOafEntity() {
// given
Oaf a = mock(Oaf.class);
OafEntity b = mock(OafEntity.class);
Entity b = mock(Entity.class);
// when
SerializableSupplier<BiFunction<Oaf, Oaf, Oaf>> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
@ -75,7 +76,7 @@ public class MergeAndGetTest {
void shouldThrowForRelationAndOafEntity() {
// given
Relation a = mock(Relation.class);
OafEntity b = mock(OafEntity.class);
Entity b = mock(Entity.class);
// when
SerializableSupplier<BiFunction<Oaf, Oaf, Oaf>> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
@ -96,14 +97,15 @@ public class MergeAndGetTest {
// then
Oaf x = fn.get().apply(a, b);
assertTrue(Relation.class.isAssignableFrom(x.getClass()));
verify(a).mergeFrom(b);
//verify(a).mergeFrom(b);
a = MergeUtils.mergeRelation(verify(a), b);
assertEquals(a, x);
}
@Test
void shouldThrowForOafEntityAndOaf() {
// given
OafEntity a = mock(OafEntity.class);
Entity a = mock(Entity.class);
Oaf b = mock(Oaf.class);
// when
@ -116,7 +118,7 @@ public class MergeAndGetTest {
@Test
void shouldThrowForOafEntityAndRelation() {
// given
OafEntity a = mock(OafEntity.class);
Entity a = mock(Entity.class);
Relation b = mock(Relation.class);
// when
@ -129,9 +131,9 @@ public class MergeAndGetTest {
@Test
void shouldThrowForOafEntityAndOafEntityButNotSubclasses() {
// given
class OafEntitySub1 extends OafEntity {
class OafEntitySub1 extends Entity {
}
class OafEntitySub2 extends OafEntity {
class OafEntitySub2 extends Entity {
}
OafEntitySub1 a = mock(OafEntitySub1.class);
@ -147,16 +149,16 @@ public class MergeAndGetTest {
@Test
void shouldBehaveProperlyForOafEntityAndOafEntity() {
// given
OafEntity a = mock(OafEntity.class);
OafEntity b = mock(OafEntity.class);
Entity a = mock(Entity.class);
Entity b = mock(Entity.class);
// when
SerializableSupplier<BiFunction<Oaf, Oaf, Oaf>> fn = functionFor(Strategy.MERGE_FROM_AND_GET);
// then
Oaf x = fn.get().apply(a, b);
assertTrue(OafEntity.class.isAssignableFrom(x.getClass()));
verify(a).mergeFrom(b);
assertTrue(Entity.class.isAssignableFrom(x.getClass()));
a = MergeUtils.mergeEntity(verify(a), b);
assertEquals(a, x);
}
}
@ -167,7 +169,7 @@ public class MergeAndGetTest {
@Test
void shouldThrowForOafEntityAndRelation() {
// given
OafEntity a = mock(OafEntity.class);
Entity a = mock(Entity.class);
Relation b = mock(Relation.class);
// when
@ -181,7 +183,7 @@ public class MergeAndGetTest {
void shouldThrowForRelationAndOafEntity() {
// given
Relation a = mock(Relation.class);
OafEntity b = mock(OafEntity.class);
Entity b = mock(Entity.class);
// when
SerializableSupplier<BiFunction<Oaf, Oaf, Oaf>> fn = functionFor(Strategy.SELECT_NEWER_AND_GET);
@ -193,7 +195,7 @@ public class MergeAndGetTest {
@Test
void shouldThrowForOafEntityAndResult() {
// given
OafEntity a = mock(OafEntity.class);
Entity a = mock(Entity.class);
Result b = mock(Result.class);
// when
@ -223,9 +225,9 @@ public class MergeAndGetTest {
@Test
void shouldShouldReturnLeftForOafEntityAndOafEntity() {
// given
OafEntity a = mock(OafEntity.class);
Entity a = mock(Entity.class);
when(a.getLastupdatetimestamp()).thenReturn(1L);
OafEntity b = mock(OafEntity.class);
Entity b = mock(Entity.class);
when(b.getLastupdatetimestamp()).thenReturn(2L);
// when
@ -233,16 +235,16 @@ public class MergeAndGetTest {
// then
Oaf x = fn.get().apply(a, b);
assertTrue(OafEntity.class.isAssignableFrom(x.getClass()));
assertTrue(Entity.class.isAssignableFrom(x.getClass()));
assertEquals(b, x);
}
@Test
void shouldShouldReturnRightForOafEntityAndOafEntity() {
// given
OafEntity a = mock(OafEntity.class);
Entity a = mock(Entity.class);
when(a.getLastupdatetimestamp()).thenReturn(2L);
OafEntity b = mock(OafEntity.class);
Entity b = mock(Entity.class);
when(b.getLastupdatetimestamp()).thenReturn(1L);
// when
@ -250,7 +252,7 @@ public class MergeAndGetTest {
// then
Oaf x = fn.get().apply(a, b);
assertTrue(OafEntity.class.isAssignableFrom(x.getClass()));
assertTrue(Entity.class.isAssignableFrom(x.getClass()));
assertEquals(a, x);
}
}

View File

@ -14,6 +14,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
@ -27,7 +28,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
public class PromoteActionPayloadForGraphTableJobTest {
@ -80,7 +81,7 @@ public class PromoteActionPayloadForGraphTableJobTest {
void shouldThrowWhenGraphTableClassIsNotASubClassOfActionPayloadClass() {
// given
Class<Relation> rowClazz = Relation.class;
Class<OafEntity> actionPayloadClazz = OafEntity.class;
Class<Entity> actionPayloadClazz = Entity.class;
// when
RuntimeException exception = assertThrows(

View File

@ -3,6 +3,8 @@ package eu.dnetlib.dhp.actionmanager;
import java.util.Optional;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.EntityDataInfo;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@ -18,7 +20,6 @@ import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
public class Constants {
public static final String DOI = "doi";
public static final String DOI_CLASSNAME = "Digital Object Identifier";
public static final String DEFAULT_DELIMITER = ",";
@ -41,6 +42,58 @@ public class Constants {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final EntityDataInfo SciNoBo_DATA_INFO = OafMapperUtils
.dataInfo(
false,
false,
0.8f, //TODO check
"SciNoBo",
true,
OafMapperUtils
.qualifier(
ModelConstants.PROVENANCE_ENRICH,
null,
ModelConstants.DNET_PROVENANCE_ACTIONS));
public static final DataInfo Bip_DATA_INFO3 = OafMapperUtils
.dataInfo(
false,
false,
0.8f,
UPDATE_DATA_INFO_TYPE,
false,
OafMapperUtils
.qualifier(
UPDATE_MEASURE_BIP_CLASS_ID,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS));
public static final EntityDataInfo Bip_DATA_INFO2 = OafMapperUtils
.dataInfo(
false,
false,
0.8f,
UPDATE_DATA_INFO_TYPE,
true,
OafMapperUtils
.qualifier(
UPDATE_MEASURE_BIP_CLASS_ID,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS));
public static final EntityDataInfo Bip_DATA_INFO = OafMapperUtils
.dataInfo(
false,
false,
0.8f, //TODO check
UPDATE_DATA_INFO_TYPE,
true,
OafMapperUtils
.qualifier(
ModelConstants.PROVENANCE_ENRICH,
null,
ModelConstants.DNET_PROVENANCE_ACTIONS));
private Constants() {
}
@ -71,23 +124,19 @@ public class Constants {
.qualifier(
classid,
classname,
ModelConstants.DNET_SUBJECT_TYPOLOGIES,
ModelConstants.DNET_SUBJECT_TYPOLOGIES));
s
.setDataInfo(
OafMapperUtils
.dataInfo(
false,
0.0f, //TODO check
UPDATE_DATA_INFO_TYPE,
true,
false,
OafMapperUtils
.qualifier(
diqualifierclassid,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
""));
ModelConstants.DNET_PROVENANCE_ACTIONS)));
return s;

View File

@ -40,7 +40,6 @@ import scala.Tuple2;
*/
public class SparkAtomicActionScoreJob implements Serializable {
private static final String DOI = "doi";
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -97,7 +96,6 @@ public class SparkAtomicActionScoreJob implements Serializable {
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class));
bipScores
.map((MapFunction<BipScore, Result>) bs -> {
Result ret = new Result();
@ -129,25 +127,11 @@ public class SparkAtomicActionScoreJob implements Serializable {
.getUnit()
.stream()
.map(unit -> {
KeyValue kv = new KeyValue();
kv.setValue(unit.getValue());
kv.setKey(unit.getKey());
kv
.setDataInfo(
OafMapperUtils
.dataInfo(
false,
UPDATE_DATA_INFO_TYPE,
true,
false,
OafMapperUtils
.qualifier(
UPDATE_MEASURE_BIP_CLASS_ID,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
""));
return kv;
MeasureUnit u = new MeasureUnit();
u.setValue(unit.getValue());
u.setKey(unit.getKey());
u.setDataInfo(Bip_DATA_INFO3);
return u;
})
.collect(Collectors.toList()));
return m;

View File

@ -11,6 +11,8 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -29,10 +31,6 @@ import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Measure;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.DHPUtils;
@ -96,12 +94,12 @@ public class PrepareBipFinder implements Serializable {
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class))
.map((MapFunction<BipScore, Result>) v -> {
Result r = new Result();
final String cleanedPid = CleaningFunctions.normalizePidValue(DOI, v.getId());
final String cleanedPid = CleaningFunctions.normalizePidValue(PidType.doi.toString(), v.getId());
r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), DOI));
r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), PidType.doi.toString()));
Instance inst = new Instance();
inst.setMeasures(getMeasure(v));
/*
inst
.setPid(
Arrays
@ -111,11 +109,15 @@ public class PrepareBipFinder implements Serializable {
cleanedPid,
OafMapperUtils
.qualifier(
DOI, DOI_CLASSNAME,
PidType.doi.toString(), DOI_CLASSNAME,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES),
null)));
*/
r.setInstance(Arrays.asList(inst));
/*
r
.setDataInfo(
OafMapperUtils
@ -129,6 +131,8 @@ public class PrepareBipFinder implements Serializable {
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
null));
*/
return r;
}, Encoders.bean(Result.class))
.write()
@ -150,9 +154,10 @@ public class PrepareBipFinder implements Serializable {
.getUnit()
.stream()
.map(unit -> {
KeyValue kv = new KeyValue();
kv.setValue(unit.getValue());
kv.setKey(unit.getKey());
MeasureUnit u = new MeasureUnit();
u.setValue(u.getValue());
u.setKey(u.getKey());
/*
kv
.setDataInfo(
OafMapperUtils
@ -168,7 +173,9 @@ public class PrepareBipFinder implements Serializable {
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
""));
return kv;
*/
return u;
})
.collect(Collectors.toList()));
return m;

View File

@ -8,6 +8,8 @@ import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.EntityDataInfo;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
@ -60,7 +62,6 @@ public class PrepareFOSSparkJob implements Serializable {
distributeFOSdois(
spark,
sourcePath,
outputPath);
});
}
@ -73,7 +74,7 @@ public class PrepareFOSSparkJob implements Serializable {
.mapGroups((MapGroupsFunction<String, FOSDataModel, Result>) (k, it) -> {
Result r = new Result();
FOSDataModel first = it.next();
r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI));
r.setId(DHPUtils.generateUnresolvedIdentifier(k, PidType.doi.toString()));
HashSet<String> level1 = new HashSet<>();
HashSet<String> level2 = new HashSet<>();
@ -85,19 +86,7 @@ public class PrepareFOSSparkJob implements Serializable {
level2.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID)));
level3.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID)));
r.setSubject(sbjs);
r
.setDataInfo(
OafMapperUtils
.dataInfo(
false, null, true,
false,
OafMapperUtils
.qualifier(
ModelConstants.PROVENANCE_ENRICH,
null,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
null));
r.setDataInfo(SciNoBo_DATA_INFO);
return r;
}, Encoders.bean(Result.class))
.write()

View File

@ -8,6 +8,8 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import eu.dnetlib.dhp.schema.oaf.EntityDataInfo;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
@ -60,7 +62,6 @@ public class PrepareSDGSparkJob implements Serializable {
doPrepare(
spark,
sourcePath,
outputPath);
});
}
@ -72,7 +73,7 @@ public class PrepareSDGSparkJob implements Serializable {
.groupByKey((MapFunction<SDGDataModel, String>) r -> r.getDoi().toLowerCase(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, SDGDataModel, Result>) (k, it) -> {
Result r = new Result();
r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI));
r.setId(DHPUtils.generateUnresolvedIdentifier(k, PidType.doi.toString()));
SDGDataModel first = it.next();
List<Subject> sbjs = new ArrayList<>();
sbjs.add(getSubject(first.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID));
@ -81,19 +82,7 @@ public class PrepareSDGSparkJob implements Serializable {
s -> sbjs
.add(getSubject(s.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)));
r.setSubject(sbjs);
r
.setDataInfo(
OafMapperUtils
.dataInfo(
false, null, true,
false,
OafMapperUtils
.qualifier(
ModelConstants.PROVENANCE_ENRICH,
null,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
null));
r.setDataInfo(SciNoBo_DATA_INFO);
return r;
}, Encoders.bean(Result.class))
.write()

View File

@ -7,6 +7,8 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
@ -26,7 +28,6 @@ import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
@ -36,7 +37,7 @@ public class CreateActionSetSparkJob implements Serializable {
public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations";
public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations";
private static final String ID_PREFIX = "50|doi_________::";
private static final String TRUST = "0.91";
private static final Float TRUST = 0.91f;
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -145,46 +146,31 @@ public class CreateActionSetSparkJob implements Serializable {
String target,
String relclass) {
Relation r = new Relation();
r.setCollectedfrom(getCollectedFrom());
r.setProvenance(getProvenance());
r.setSource(source);
r.setTarget(target);
r.setRelClass(relclass);
r.setRelType(ModelConstants.RESULT_RESULT);
r.setSubRelType(ModelConstants.CITATION);
r
.setDataInfo(
getDataInfo());
return r;
}
public static List<KeyValue> getCollectedFrom() {
private static List<Provenance> getProvenance() {
return Arrays.asList(OafMapperUtils.getProvenance(getCollectedFrom(), getDataInfo()));
}
public static KeyValue getCollectedFrom() {
KeyValue kv = new KeyValue();
kv.setKey(ModelConstants.OPENOCITATIONS_ID);
kv.setValue(ModelConstants.OPENOCITATIONS_NAME);
return Arrays.asList(kv);
return kv;
}
public static DataInfo getDataInfo() {
DataInfo di = new DataInfo();
di.setInferred(false);
di.setDeletedbyinference(false);
di.setTrust(TRUST);
di
.setProvenanceaction(
getQualifier(OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS));
return di;
return OafMapperUtils.dataInfo(TRUST, null, false,
OafMapperUtils.qualifier(OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS));
}
public static Qualifier getQualifier(String class_id, String class_name,
String qualifierSchema) {
Qualifier pa = new Qualifier();
pa.setClassid(class_id);
pa.setClassname(class_name);
pa.setSchemeid(qualifierSchema);
pa.setSchemename(qualifierSchema);
return pa;
}
}

View File

@ -7,6 +7,8 @@ import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@ -27,10 +29,9 @@ import eu.dnetlib.dhp.actionmanager.project.utils.model.EXCELTopic;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.H2020Classification;
import eu.dnetlib.dhp.schema.oaf.H2020Programme;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Entity;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
@ -153,11 +154,13 @@ public class SparkAtomicActionJob {
}, Encoders.bean(Project.class))
.filter(Objects::nonNull)
.groupByKey(
(MapFunction<Project, String>) OafEntity::getId,
(MapFunction<Project, String>) Entity::getId,
Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Project, Project>) (s, it) -> {
Project first = it.next();
it.forEachRemaining(first::mergeFrom);
while (it.hasNext()) {
first = MergeUtils.mergeProject(first, it.next());
}
return first;
}, Encoders.bean(Project.class))
.toJavaRDD()

View File

@ -4,7 +4,6 @@ package eu.dnetlib.dhp.actionmanager.ror;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty;
@ -21,6 +20,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@ -43,13 +43,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
@ -64,11 +57,11 @@ public class GenerateRorActionSetJob {
private static final List<KeyValue> ROR_COLLECTED_FROM = listKeyValues(
"10|openaire____::993a7ae7a863813cf95028b50708e222", "ROR");
private static final DataInfo ROR_DATA_INFO = dataInfo(
false, "", false, false, ENTITYREGISTRY_PROVENANCE_ACTION, "0.92");
private static final EntityDataInfo ROR_DATA_INFO = dataInfo(
false, false, 0.92f, null, false, ENTITYREGISTRY_PROVENANCE_ACTION);
private static final Qualifier ROR_PID_TYPE = qualifier(
"ROR", "ROR", ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES);
"ROR", "ROR", ModelConstants.DNET_PID_TYPES);
public static void main(final String[] args) throws Exception {
@ -132,11 +125,10 @@ public class GenerateRorActionSetJob {
o.setDateofcollection(now.toString());
o.setDateoftransformation(now.toString());
o.setExtraInfo(new ArrayList<>()); // Values not present in the file
o.setOaiprovenance(null); // Values not present in the file
o.setLegalshortname(field(r.getAcronyms().stream().findFirst().orElse(r.getName()), ROR_DATA_INFO));
o.setLegalname(field(r.getName(), ROR_DATA_INFO));
o.setLegalshortname(r.getAcronyms().stream().findFirst().orElse(r.getName()));
o.setLegalname(r.getName());
o.setAlternativeNames(alternativeNames(r));
o.setWebsiteurl(field(r.getLinks().stream().findFirst().orElse(null), ROR_DATA_INFO));
o.setWebsiteurl(r.getLinks().stream().findFirst().orElse(null));
o.setLogourl(null);
o.setEclegalbody(null);
o.setEclegalperson(null);
@ -155,7 +147,7 @@ public class GenerateRorActionSetJob {
r.getCountry().getCountryCode(), r
.getCountry()
.getCountryName(),
ModelConstants.DNET_COUNTRY_TYPE, ModelConstants.DNET_COUNTRY_TYPE));
ModelConstants.DNET_COUNTRY_TYPE));
} else {
o.setCountry(null);
}
@ -175,17 +167,17 @@ public class GenerateRorActionSetJob {
private static List<StructuredProperty> pids(final RorOrganization r) {
final List<StructuredProperty> pids = new ArrayList<>();
pids.add(structuredProperty(r.getId(), ROR_PID_TYPE, ROR_DATA_INFO));
pids.add(structuredProperty(r.getId(), ROR_PID_TYPE));
for (final Map.Entry<String, ExternalIdType> e : r.getExternalIds().entrySet()) {
final String type = e.getKey();
final List<String> all = e.getValue().getAll();
if (all != null) {
final Qualifier qualifier = qualifier(
type, type, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES);
type, type, ModelConstants.DNET_PID_TYPES);
for (final String pid : all) {
pids
.add(structuredProperty(pid, qualifier, ROR_DATA_INFO));
.add(structuredProperty(pid, qualifier));
}
}
}
@ -193,7 +185,7 @@ public class GenerateRorActionSetJob {
return pids;
}
private static List<Field<String>> alternativeNames(final RorOrganization r) {
private static List<String> alternativeNames(final RorOrganization r) {
final Set<String> names = new LinkedHashSet<>();
names.addAll(r.getAliases());
names.addAll(r.getAcronyms());
@ -202,7 +194,6 @@ public class GenerateRorActionSetJob {
return names
.stream()
.filter(StringUtils::isNotBlank)
.map(s -> field(s, ROR_DATA_INFO))
.collect(Collectors.toList());
}

View File

@ -121,17 +121,14 @@ public class SparkAtomicActionUsageJob implements Serializable {
private static List<Measure> getMeasure(Long downloads, Long views) {
DataInfo dataInfo = OafMapperUtils
.dataInfo(
false,
0.0f, //TODO check
UPDATE_DATA_INFO_TYPE,
true,
false,
OafMapperUtils
.qualifier(
UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"");
ModelConstants.DNET_PROVENANCE_ACTIONS));
return Arrays
.asList(

View File

@ -11,6 +11,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
@ -216,7 +217,8 @@ public class GenerateNativeStoreSparkJob {
invalidRecords.add(1);
return null;
}
return new MetadataRecord(originalIdentifier, encoding, provenance, document.asXML(), dateOfCollection);
final String id = ModelSupport.generateIdentifier(originalIdentifier, provenance.getNsPrefix());
return new MetadataRecord(id, originalIdentifier, encoding, provenance, document.asXML(), dateOfCollection);
} catch (Throwable e) {
invalidRecords.add(1);
return null;

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.collection
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.oaf.{Entity, Oaf, Entity, Relation}
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport
import eu.dnetlib.dhp.schema.oaf.{Entity, Oaf, Relation}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode}
object CollectionUtils {
@ -35,7 +35,6 @@ object CollectionUtils {
inverse.setSubRelType(currentRel.getSubReltype)
inverse.setRelClass(currentRel.getInverseRelClass)
inverse.setProvenance(r.getProvenance)
inverse.setDataInfo(r.getDataInfo)
inverse.setProperties(r.getProperties)
inverse.setValidated(r.getValidated)
inverse.setValidationDate(r.getValidationDate)

View File

@ -459,12 +459,12 @@ object DataciteToOAFTransformation {
} else if (publication_year != null) {
val date = s"01-01-$publication_year"
if (doi.startsWith("10.14457")) {
val date = fix_thai_date(date, "[dd-MM-yyyy]")
result.setDateofacceptance(date)
val fdate = fix_thai_date(date, "[dd-MM-yyyy]")
result.setDateofacceptance(fdate)
result
.getInstance()
.get(0)
.setDateofacceptance(date)
.setDateofacceptance(fdate)
} else {
result.setDateofacceptance(date)
result
@ -636,7 +636,6 @@ object DataciteToOAFTransformation {
val rel = new Relation
rel.setProvenance(Lists.newArrayList(OafMapperUtils.getProvenance(DATACITE_COLLECTED_FROM, dataInfo)))
rel.setDataInfo(dataInfo)
val subRelType = subRelTypeMapping(r.relationType).relType
rel.setRelType(REL_TYPE_VALUE)

View File

@ -1,11 +1,13 @@
package eu.dnetlib.dhp.sx.bio
import com.google.common.collect.Lists
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, OafMapperUtils}
import eu.dnetlib.dhp.schema.oaf._
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import collection.JavaConverters._
object BioDBToOAF {
@ -34,13 +36,20 @@ object BioDBToOAF {
authors: List[String]
) {}
val DATA_INFO: DataInfo = OafMapperUtils.dataInfo(
false,
val REL_DATA_INFO: DataInfo = OafMapperUtils.dataInfo(
0.9f,
null,
false,
ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER
)
val DATA_INFO: EntityDataInfo = OafMapperUtils.dataInfo(
false,
ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER,
"0.9"
false,
0.9f,
null,
false,
ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER
)
val SUBJ_CLASS = "Keywords"
@ -88,15 +97,6 @@ object BioDBToOAF {
val pubmedCollectedFrom: KeyValue =
OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central")
UNIPROTCollectedFrom.setDataInfo(DATA_INFO)
PDBCollectedFrom.setDataInfo(DATA_INFO)
ElsevierCollectedFrom.setDataInfo(DATA_INFO)
EBICollectedFrom.setDataInfo(DATA_INFO)
pubmedCollectedFrom.setDataInfo(DATA_INFO)
enaCollectedFrom.setDataInfo(DATA_INFO)
ncbiCollectedFrom.setDataInfo(DATA_INFO)
springerNatureCollectedFrom.setDataInfo(DATA_INFO)
Map(
"uniprot" -> UNIPROTCollectedFrom,
"pdb" -> PDBCollectedFrom,
@ -144,9 +144,7 @@ object BioDBToOAF {
input.pid.toLowerCase,
input.pidType.toLowerCase,
input.pidType.toLowerCase,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES,
DATA_INFO
ModelConstants.DNET_PID_TYPES
)
).asJava
)
@ -161,8 +159,7 @@ object BioDBToOAF {
List(
OafMapperUtils.structuredProperty(
input.tilte.head,
ModelConstants.MAIN_TITLE_QUALIFIER,
DATA_INFO
ModelConstants.MAIN_TITLE_QUALIFIER
)
).asJava
)
@ -181,7 +178,6 @@ object BioDBToOAF {
OafMapperUtils.qualifier(
"0037",
"Clinical Trial",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
@ -190,7 +186,6 @@ object BioDBToOAF {
OafMapperUtils.qualifier(
"0046",
"Bioentity",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
@ -213,8 +208,8 @@ object BioDBToOAF {
}
if (input.date != null && input.date.nonEmpty) {
val dt = input.date.head
i.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(dt), DATA_INFO))
d.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(dt), DATA_INFO))
i.setDateofacceptance(GraphCleaningFunctions.cleanDate(dt))
d.setDateofacceptance(GraphCleaningFunctions.cleanDate(dt))
}
d
}
@ -232,9 +227,7 @@ object BioDBToOAF {
pid,
"uniprot",
"uniprot",
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES,
DATA_INFO
ModelConstants.DNET_PID_TYPES
)
).asJava
)
@ -248,7 +241,7 @@ object BioDBToOAF {
if (title != null)
d.setTitle(
List(
OafMapperUtils.structuredProperty(title, ModelConstants.MAIN_TITLE_QUALIFIER, DATA_INFO)
OafMapperUtils.structuredProperty(title, ModelConstants.MAIN_TITLE_QUALIFIER)
).asJava
)
@ -261,7 +254,6 @@ object BioDBToOAF {
OafMapperUtils.qualifier(
"0046",
"Bioentity",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
@ -286,7 +278,6 @@ object BioDBToOAF {
SUBJ_CLASS,
SUBJ_CLASS,
ModelConstants.DNET_SUBJECT_TYPOLOGIES,
ModelConstants.DNET_SUBJECT_TYPOLOGIES,
null
)
)
@ -298,8 +289,8 @@ object BioDBToOAF {
if (dates.nonEmpty) {
i_date = dates.find(d => d.date_info.contains("entry version"))
if (i_date.isDefined) {
i.setDateofacceptance(OafMapperUtils.field(i_date.get.date, DATA_INFO))
d.setDateofacceptance(OafMapperUtils.field(i_date.get.date, DATA_INFO))
i.setDateofacceptance(i_date.get.date)
d.setDateofacceptance(i_date.get.date)
}
val relevant_dates: List[StructuredProperty] = dates
.filter(d => !d.date_info.contains("entry version"))
@ -308,14 +299,12 @@ object BioDBToOAF {
date.date,
ModelConstants.UNKNOWN,
ModelConstants.UNKNOWN,
ModelConstants.DNET_DATACITE_DATE,
ModelConstants.DNET_DATACITE_DATE,
DATA_INFO
ModelConstants.DNET_DATACITE_DATE
)
)
if (relevant_dates != null && relevant_dates.nonEmpty)
d.setRelevantdate(relevant_dates.asJava)
d.setDateofacceptance(OafMapperUtils.field(i_date.get.date, DATA_INFO))
d.setDateofacceptance(i_date.get.date)
}
val references_pmid: List[String] = for {
@ -338,7 +327,7 @@ object BioDBToOAF {
ModelConstants.IS_RELATED_TO,
if (i_date.isDefined) i_date.get.date else null
)
rel.getCollectedfrom
rel.getProvenance.asScala.map(p => p.getCollectedfrom)
List(d, rel)
} else if (references_doi != null && references_doi.nonEmpty) {
val rel = createRelation(
@ -370,8 +359,13 @@ object BioDBToOAF {
): Relation = {
val rel = new Relation
rel.setCollectedfrom(List(collectedFromMap("pdb")).asJava)
rel.setDataInfo(DATA_INFO)
val provenance = OafMapperUtils.getProvenance(Lists.newArrayList(
collectedFrom,
collectedFromMap("pdb")
), REL_DATA_INFO)
rel.setProvenance(provenance)
rel.setRelType(ModelConstants.RESULT_RESULT)
rel.setSubRelType(subRelType)
@ -383,9 +377,8 @@ object BioDBToOAF {
val dateProps: KeyValue = OafMapperUtils.keyValue(DATE_RELATION_KEY, date)
rel.setProperties(List(dateProps).asJava)
rel.getTarget.startsWith("unresolved")
rel.setCollectedfrom(List(collectedFrom).asJava)
rel
}
@ -424,9 +417,7 @@ object BioDBToOAF {
pdb,
"pdb",
"Protein Data Bank Identifier",
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES,
DATA_INFO
ModelConstants.DNET_PID_TYPES
)
).asJava
)
@ -442,7 +433,7 @@ object BioDBToOAF {
return List()
d.setTitle(
List(
OafMapperUtils.structuredProperty(title, ModelConstants.MAIN_TITLE_QUALIFIER, DATA_INFO)
OafMapperUtils.structuredProperty(title, ModelConstants.MAIN_TITLE_QUALIFIER)
).asJava
)
@ -467,7 +458,6 @@ object BioDBToOAF {
OafMapperUtils.qualifier(
"0046",
"Bioentity",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
@ -535,8 +525,7 @@ object BioDBToOAF {
List(
OafMapperUtils.structuredProperty(
input.title,
ModelConstants.MAIN_TITLE_QUALIFIER,
DATA_INFO
ModelConstants.MAIN_TITLE_QUALIFIER
)
).asJava
)
@ -552,9 +541,7 @@ object BioDBToOAF {
input.targetPid.toLowerCase,
input.targetPidType.toLowerCase,
"Protein Data Bank Identifier",
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES,
DATA_INFO
ModelConstants.DNET_PID_TYPES
)
).asJava
)
@ -567,19 +554,14 @@ object BioDBToOAF {
OafMapperUtils.qualifier(
"0046",
"Bioentity",
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
i.setCollectedfrom(collectedFromMap("ebi"))
d.setInstance(List(i).asJava)
i.setDateofacceptance(
OafMapperUtils.field(GraphCleaningFunctions.cleanDate(input.date), DATA_INFO)
)
d.setDateofacceptance(
OafMapperUtils.field(GraphCleaningFunctions.cleanDate(input.date), DATA_INFO)
)
i.setDateofacceptance(GraphCleaningFunctions.cleanDate(input.date))
d.setDateofacceptance(GraphCleaningFunctions.cleanDate(input.date))
List(
d,

View File

@ -25,13 +25,13 @@ object PubMedToOaf {
"doi" -> "https://dx.doi.org/"
)
val dataInfo: DataInfo = OafMapperUtils.dataInfo(
val ENTITY_DATAINFO: EntityDataInfo = OafMapperUtils.dataInfo(
false,
false,
0.9f,
null,
false,
false,
ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER,
"0.9"
ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER
)
val collectedFrom: KeyValue =
@ -98,14 +98,12 @@ object PubMedToOaf {
return null
val journal = new Journal
journal.setDataInfo(dataInfo)
journal.setName(j.getTitle)
journal.setConferencedate(j.getDate)
journal.setVol(j.getVolume)
journal.setIssnPrinted(j.getIssn)
journal.setIss(j.getIssue)
journal
}
/** Find vocabulary term into synonyms and term in the vocabulary
@ -143,9 +141,7 @@ object PubMedToOaf {
article.getPmid,
PidType.pmid.toString,
PidType.pmid.toString,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES,
dataInfo
ModelConstants.DNET_PID_TYPES
)
if (StringUtils.isNotBlank(article.getPmcId)) {
@ -153,9 +149,7 @@ object PubMedToOaf {
article.getPmcId,
PidType.pmc.toString,
PidType.pmc.toString,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES,
dataInfo
ModelConstants.DNET_PID_TYPES
)
}
if (pidList == null)
@ -170,9 +164,7 @@ object PubMedToOaf {
normalizedPid,
PidType.doi.toString,
PidType.doi.toString,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES,
dataInfo
ModelConstants.DNET_PID_TYPES
)
}
@ -200,7 +192,7 @@ object PubMedToOaf {
val result = createResult(pubmedInstance.getInstancetype, vocabularies)
if (result == null)
return result
result.setDataInfo(dataInfo)
result.setDataInfo(ENTITY_DATAINFO)
pubmedInstance.setPid(pidList.asJava)
if (alternateIdentifier != null)
pubmedInstance.setAlternateIdentifier(List(alternateIdentifier).asJava)
@ -218,9 +210,8 @@ object PubMedToOaf {
pubmedInstance.setUrl(urlLists.asJava)
//ASSIGN DateofAcceptance
pubmedInstance.setDateofacceptance(
OafMapperUtils.field(GraphCleaningFunctions.cleanDate(article.getDate), dataInfo)
)
pubmedInstance.setDateofacceptance(GraphCleaningFunctions.cleanDate(article.getDate))
//ASSIGN COLLECTEDFROM
pubmedInstance.setCollectedfrom(collectedFrom)
result.setPid(pidList.asJava)
@ -238,9 +229,7 @@ object PubMedToOaf {
// RESULT MAPPING
//--------------------------------------------------------------------------------------
result.setDateofacceptance(
OafMapperUtils.field(GraphCleaningFunctions.cleanDate(article.getDate), dataInfo)
)
result.setDateofacceptance(GraphCleaningFunctions.cleanDate(article.getDate))
if (article.getTitle == null || article.getTitle.isEmpty)
return null
@ -248,14 +237,13 @@ object PubMedToOaf {
List(
OafMapperUtils.structuredProperty(
article.getTitle,
ModelConstants.MAIN_TITLE_QUALIFIER,
dataInfo
ModelConstants.MAIN_TITLE_QUALIFIER
)
).asJava
)
if (article.getDescription != null && article.getDescription.nonEmpty)
result.setDescription(List(OafMapperUtils.field(article.getDescription, dataInfo)).asJava)
result.setDescription(List(article.getDescription).asJava)
if (article.getLanguage != null) {
@ -271,8 +259,7 @@ object PubMedToOaf {
SUBJ_CLASS,
SUBJ_CLASS,
ModelConstants.DNET_SUBJECT_TYPOLOGIES,
ModelConstants.DNET_SUBJECT_TYPOLOGIES,
dataInfo
ENTITY_DATAINFO
)
)(collection.breakOut)
if (subjects != null)

View File

@ -94,57 +94,6 @@ public class PrepareTest {
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).count());
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().size());
Assertions
.assertEquals(
3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().get(0).getMeasures().size());
Assertions
.assertEquals(
"6.34596412687e-09", tmp
.filter(r -> r.getId().equals(doi1))
.collect()
.get(0)
.getInstance()
.get(0)
.getMeasures()
.stream()
.filter(sl -> sl.getId().equals("influence"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"0.641151896994", tmp
.filter(r -> r.getId().equals(doi1))
.collect()
.get(0)
.getInstance()
.get(0)
.getMeasures()
.stream()
.filter(sl -> sl.getId().equals("popularity_alt"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"2.33375102921e-09", tmp
.filter(r -> r.getId().equals(doi1))
.collect()
.get(0)
.getInstance()
.get(0)
.getMeasures()
.stream()
.filter(sl -> sl.getId().equals("popularity"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
final String doi2 = "unresolved::10.3390/s18072310::doi";

View File

@ -87,14 +87,8 @@ public class ProduceTest {
.forEach(
sbj -> Assertions
.assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemeid()));
sbjs
.forEach(
sbj -> Assertions
.assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemename()));
sbjs.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getDeletedbyinference()));
sbjs.forEach(sbj -> Assertions.assertEquals(true, sbj.getDataInfo().getInferred()));
sbjs.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getInvisible()));
sbjs.forEach(sbj -> Assertions.assertEquals("", sbj.getDataInfo().getTrust()));
sbjs.forEach(sbj -> Assertions.assertEquals("update", sbj.getDataInfo().getInferenceprovenance()));
sbjs
@ -109,49 +103,6 @@ public class ProduceTest {
sbj -> Assertions
.assertEquals(
ModelConstants.DNET_PROVENANCE_ACTIONS, sbj.getDataInfo().getProvenanceaction().getSchemeid()));
sbjs
.forEach(
sbj -> Assertions
.assertEquals(
ModelConstants.DNET_PROVENANCE_ACTIONS,
sbj.getDataInfo().getProvenanceaction().getSchemename()));
}
@Test
void produceTestMeasuress() throws Exception {
JavaRDD<Result> tmp = getResultJavaRDD();
List<KeyValue> mes = tmp
.filter(row -> row.getInstance() != null && row.getInstance().size() > 0)
.flatMap(row -> row.getInstance().iterator())
.flatMap(i -> i.getMeasures().iterator())
.flatMap(m -> m.getUnit().iterator())
.collect();
mes.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getDeletedbyinference()));
mes.forEach(sbj -> Assertions.assertEquals(true, sbj.getDataInfo().getInferred()));
mes.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getInvisible()));
mes.forEach(sbj -> Assertions.assertEquals("", sbj.getDataInfo().getTrust()));
mes.forEach(sbj -> Assertions.assertEquals("update", sbj.getDataInfo().getInferenceprovenance()));
mes
.forEach(
sbj -> Assertions.assertEquals("measure:bip", sbj.getDataInfo().getProvenanceaction().getClassid()));
mes
.forEach(
sbj -> Assertions
.assertEquals("Inferred by OpenAIRE", sbj.getDataInfo().getProvenanceaction().getClassname()));
mes
.forEach(
sbj -> Assertions
.assertEquals(
ModelConstants.DNET_PROVENANCE_ACTIONS, sbj.getDataInfo().getProvenanceaction().getSchemeid()));
mes
.forEach(
sbj -> Assertions
.assertEquals(
ModelConstants.DNET_PROVENANCE_ACTIONS,
sbj.getDataInfo().getProvenanceaction().getSchemename()));
}
@Test
@ -191,107 +142,6 @@ public class ProduceTest {
}
@Test
void produceTest3Measures() throws Exception {
final String doi = "unresolved::10.3390/s18072310::doi";
JavaRDD<Result> tmp = getResultJavaRDD();
tmp
.filter(row -> row.getId().equals(doi))
.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
Assertions
.assertEquals(
3, tmp
.filter(row -> row.getId().equals(doi))
.collect()
.get(0)
.getInstance()
.get(0)
.getMeasures()
.size());
List<Measure> measures = tmp
.filter(row -> row.getId().equals(doi))
.flatMap(row -> row.getInstance().iterator())
.flatMap(inst -> inst.getMeasures().iterator())
.collect();
Assertions
.assertEquals(
"7.5597134689e-09", measures
.stream()
.filter(mes -> mes.getId().equals("influence"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"4.903880192", measures
.stream()
.filter(mes -> mes.getId().equals("popularity_alt"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"1.17977512835e-08", measures
.stream()
.filter(mes -> mes.getId().equals("popularity"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"10.3390/s18072310",
tmp
.filter(row -> row.getId().equals(doi))
.collect()
.get(0)
.getInstance()
.get(0)
.getPid()
.get(0)
.getValue()
.toLowerCase());
Assertions
.assertEquals(
"doi",
tmp
.filter(row -> row.getId().equals(doi))
.collect()
.get(0)
.getInstance()
.get(0)
.getPid()
.get(0)
.getQualifier()
.getClassid());
Assertions
.assertEquals(
"Digital Object Identifier",
tmp
.filter(row -> row.getId().equals(doi))
.collect()
.get(0)
.getInstance()
.get(0)
.getPid()
.get(0)
.getQualifier()
.getClassname());
}
@Test
void produceTestMeasures() throws Exception {
final String doi = "unresolved::10.3390/s18072310::doi";
@ -553,14 +403,8 @@ public class ProduceTest {
.forEach(
sbj -> Assertions
.assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemeid()));
sbjs_sdg
.forEach(
sbj -> Assertions
.assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemename()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getDeletedbyinference()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals(true, sbj.getDataInfo().getInferred()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getInvisible()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals("", sbj.getDataInfo().getTrust()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals("update", sbj.getDataInfo().getInferenceprovenance()));
sbjs_sdg
@ -575,12 +419,6 @@ public class ProduceTest {
sbj -> Assertions
.assertEquals(
ModelConstants.DNET_PROVENANCE_ACTIONS, sbj.getDataInfo().getProvenanceaction().getSchemeid()));
sbjs_sdg
.forEach(
sbj -> Assertions
.assertEquals(
ModelConstants.DNET_PROVENANCE_ACTIONS,
sbj.getDataInfo().getProvenanceaction().getSchemename()));
}
}

View File

@ -7,6 +7,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
@ -164,8 +165,8 @@ public class CreateOpenCitationsASTest {
.map(aa -> ((Relation) aa.getPayload()));
tmp.foreach(r -> {
assertEquals(ModelConstants.OPENOCITATIONS_NAME, r.getCollectedfrom().get(0).getValue());
assertEquals(ModelConstants.OPENOCITATIONS_ID, r.getCollectedfrom().get(0).getKey());
assertEquals(ModelConstants.OPENOCITATIONS_NAME, r.getProvenance().get(0).getCollectedfrom().getValue());
assertEquals(ModelConstants.OPENOCITATIONS_ID, r.getProvenance().get(0).getCollectedfrom().getKey());
});
}
@ -197,15 +198,14 @@ public class CreateOpenCitationsASTest {
.map(aa -> ((Relation) aa.getPayload()));
tmp.foreach(r -> {
assertEquals(false, r.getDataInfo().getInferred());
assertEquals(false, r.getDataInfo().getDeletedbyinference());
assertEquals("0.91", r.getDataInfo().getTrust());
final DataInfo dataInfo = r.getProvenance().get(0).getDataInfo();
assertEquals(false, dataInfo.getInferred());
assertEquals("0.91", dataInfo.getTrust());
assertEquals(
CreateActionSetSparkJob.OPENCITATIONS_CLASSID, r.getDataInfo().getProvenanceaction().getClassid());
CreateActionSetSparkJob.OPENCITATIONS_CLASSID, dataInfo.getProvenanceaction().getClassid());
assertEquals(
CreateActionSetSparkJob.OPENCITATIONS_CLASSNAME, r.getDataInfo().getProvenanceaction().getClassname());
assertEquals(ModelConstants.DNET_PROVENANCE_ACTIONS, r.getDataInfo().getProvenanceaction().getSchemeid());
assertEquals(ModelConstants.DNET_PROVENANCE_ACTIONS, r.getDataInfo().getProvenanceaction().getSchemename());
CreateActionSetSparkJob.OPENCITATIONS_CLASSNAME, dataInfo.getProvenanceaction().getClassname());
assertEquals(ModelConstants.DNET_PROVENANCE_ACTIONS, dataInfo.getProvenanceaction().getSchemeid());
});
}

View File

@ -50,7 +50,7 @@ class GenerateRorActionSetJobTest {
assertEquals("AU", o.getCountry().getClassid());
assertNotNull(o.getLegalname());
assertEquals("Mount Stromlo Observatory", o.getLegalname().getValue());
assertEquals("Mount Stromlo Observatory", o.getLegalname());
System.out.println(mapper.writeValueAsString(o));
}

View File

@ -83,16 +83,6 @@ public class SparkAtomicActionCountJobTest {
Assertions.assertEquals(9, tmp.count());
tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size()));
tmp
.foreach(
r -> r
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference()))));
tmp
.foreach(
r -> r
@ -100,17 +90,6 @@ public class SparkAtomicActionCountJobTest {
.stream()
.forEach(
m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred()))));
tmp
.foreach(
r -> r
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible()))));
tmp
.foreach(
r -> r

View File

@ -16,6 +16,7 @@ import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.zookeeper.Op;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@ -129,8 +130,10 @@ abstract class AbstractSparkAction implements Serializable {
protected static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
for(Provenance prov : rel.getProvenance()) {
if (prov.getDataInfo() == null) {
prov.setDataInfo(new DataInfo());
}
}
return rel;
};
@ -138,20 +141,17 @@ abstract class AbstractSparkAction implements Serializable {
protected boolean isOpenorgs(Relation rel) {
return Optional
.ofNullable(rel.getCollectedfrom())
.map(c -> isCollectedFromOpenOrgs(c))
.orElse(false);
.ofNullable(rel.getProvenance())
.map(prov -> prov.stream().anyMatch(p -> isCollectedFromOpenOrgs(p.getCollectedfrom())))
.orElse(false);
}
protected boolean isOpenorgsDedupRel(Relation rel) {
return isOpenorgs(rel) && isOpenOrgsDedupMergeRelation(rel);
}
private boolean isCollectedFromOpenOrgs(List<KeyValue> c) {
return c
.stream()
.filter(Objects::nonNull)
.anyMatch(kv -> ModelConstants.OPENORGS_NAME.equals(kv.getValue()));
private boolean isCollectedFromOpenOrgs(KeyValue kv) {
return ModelConstants.OPENORGS_NAME.equals(kv.getValue());
}
private boolean isOpenOrgsDedupMergeRelation(Relation rel) {
@ -161,11 +161,11 @@ abstract class AbstractSparkAction implements Serializable {
ModelConstants.MERGES.equals(rel.getRelClass()));
}
protected static Boolean parseECField(Field<String> field) {
protected static Boolean parseECField(String field) {
if (field == null)
return null;
if (StringUtils.isBlank(field.getValue()) || field.getValue().equalsIgnoreCase("null"))
if (StringUtils.isBlank(field) || field.equalsIgnoreCase("null"))
return null;
return field.getValue().equalsIgnoreCase("true");
return field.equalsIgnoreCase("true");
}
}

View File

@ -14,8 +14,6 @@ import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import eu.dnetlib.dhp.schema.oaf.Field;
public class DatePicker {
public static final String DATE_PATTERN = "^(\\d{4})-(\\d{2})-(\\d{2})";
@ -26,7 +24,7 @@ public class DatePicker {
private DatePicker() {
}
public static Field<String> pick(final Collection<String> dateofacceptance) {
public static String pick(final Collection<String> dateofacceptance) {
final Map<String, Integer> frequencies = dateofacceptance
.parallelStream()
@ -35,11 +33,10 @@ public class DatePicker {
.collect(Collectors.toConcurrentMap(w -> w, w -> 1, Integer::sum));
if (frequencies.isEmpty()) {
return new Field<>();
return null;
}
final Field<String> date = new Field<>();
date.setValue(frequencies.keySet().iterator().next());
String date = frequencies.keySet().iterator().next();
// let's sort this map by values first, filtering out invalid dates
final Map<String, Integer> sorted = frequencies
@ -77,25 +74,22 @@ public class DatePicker {
.map(Map.Entry::getKey)
.findFirst();
if (first.isPresent()) {
date.setValue(first.get());
date = first.get();
return date;
}
date.setValue(sorted.keySet().iterator().next());
return date;
return sorted.keySet().iterator().next();
}
if (accepted.size() == 1) {
date.setValue(accepted.get(0));
return date;
return accepted.get(0);
} else {
final Optional<String> first = accepted
.stream()
.filter(d -> !endsWith(d, DATE_DEFAULT_SUFFIX))
.findFirst();
if (first.isPresent()) {
date.setValue(first.get());
return date;
return first.get();
}
return date;
@ -106,15 +100,13 @@ public class DatePicker {
if (sorted.size() == 2) {
for (Map.Entry<String, Integer> e : sorted.entrySet()) {
if (!endsWith(e.getKey(), DATE_DEFAULT_SUFFIX)) {
date.setValue(e.getKey());
return date;
return e.getKey();
}
}
}
// none of the dates seems good enough, return the 1st one
date.setValue(sorted.keySet().iterator().next());
return date;
return sorted.keySet().iterator().next();
}
}

View File

@ -1,28 +1,26 @@
package eu.dnetlib.dhp.oa.dedup;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.stream.Collectors;
public class DedupRecordFactory {
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
@ -31,9 +29,9 @@ public class DedupRecordFactory {
private DedupRecordFactory() {
}
public static <T extends OafEntity> Dataset<T> createDedupRecord(
public static <T extends Entity> Dataset<T> createDedupRecord(
final SparkSession spark,
final DataInfo dataInfo,
final EntityDataInfo dataInfo,
final String mergeRelsInputPath,
final String entitiesInputPath,
final Class<T> clazz) {
@ -75,8 +73,8 @@ public class DedupRecordFactory {
Encoders.bean(clazz));
}
public static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz)
public static <T extends Entity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, EntityDataInfo dataInfo, Class<T> clazz)
throws IllegalAccessException, InstantiationException, InvocationTargetException {
final Comparator<Identifier<T>> idComparator = new IdentifierComparator<>();
@ -89,24 +87,22 @@ public class DedupRecordFactory {
.map(Identifier::getEntity)
.collect(Collectors.toCollection(LinkedList::new));
final T entity = clazz.newInstance();
final T first = entityList.removeFirst();
T entity = clazz.newInstance();
T first = entityList.removeFirst();
BeanUtils.copyProperties(entity, first);
final List<List<Author>> authors = Lists.newArrayList();
for(Entity duplicate : entityList) {
entity = (T) MergeUtils.mergeEntities(entity, duplicate);
entityList
.forEach(
duplicate -> {
entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result r1 = (Result) duplicate;
Optional
.ofNullable(r1.getAuthor())
.ifPresent(a -> authors.add(a));
}
});
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result r1 = (Result) duplicate;
Optional
.ofNullable(r1.getAuthor())
.ifPresent(a -> authors.add(a));
}
}
// set authors and date
if (ModelSupport.isSubClass(entity, Result.class)) {

View File

@ -161,7 +161,6 @@ public class DedupUtility {
r.setTarget(target);
r.setSubRelType("dedupSimilarity");
r.setRelClass(ModelConstants.IS_SIMILAR_TO);
r.setDataInfo(new DataInfo());
switch (entity) {
case "result":

View File

@ -8,20 +8,20 @@ import java.io.Serializable;
import java.util.List;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Entity;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
public class IdGenerator implements Serializable {
// pick the best pid from the list (consider date and pidtype)
public static <T extends OafEntity> String generate(List<Identifier<T>> pids, String defaultID) {
public static <T extends Entity> String generate(List<Identifier<T>> pids, String defaultID) {
if (pids == null || pids.isEmpty())
return defaultID;
return generateId(pids);
}
private static <T extends OafEntity> String generateId(List<Identifier<T>> pids) {
private static <T extends Entity> String generateId(List<Identifier<T>> pids) {
Identifier<T> bp = pids
.stream()
.min(Identifier::compareTo)

View File

@ -1,26 +1,25 @@
package eu.dnetlib.dhp.oa.dedup;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Entity;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.utils.PidComparator;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.utils.PidComparator;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
public class IdentifierComparator<T extends OafEntity> implements Comparator<Identifier<T>> {
public class IdentifierComparator<T extends Entity> implements Comparator<Identifier<T>> {
public static int compareIdentifiers(Identifier left, Identifier right) {
return new IdentifierComparator<>().compare(left, right);
@ -75,7 +74,7 @@ public class IdentifierComparator<T extends OafEntity> implements Comparator<Ide
}
private StructuredProperty toSP(PidType pidType) {
return OafMapperUtils.structuredProperty("", pidType.toString(), pidType.toString(), "", "", new DataInfo());
return OafMapperUtils.structuredProperty("", pidType.toString(), pidType.toString(), ModelConstants.DNET_PID_TYPES);
}
}

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup;
import java.util.Objects;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
@ -41,8 +42,7 @@ public class RelationAggregator extends Aggregator<Relation, Relation, Relation>
return b;
}
b.mergeFrom(a);
return b;
return MergeUtils.mergeRelation(b, a);
}
@Override

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -13,7 +14,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;

View File

@ -7,6 +7,7 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP;
import java.io.IOException;
import eu.dnetlib.dhp.schema.oaf.Entity;
import eu.dnetlib.dhp.schema.oaf.EntityDataInfo;
import eu.dnetlib.dhp.schema.oaf.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import org.apache.commons.io.IOUtils;
@ -19,10 +20,6 @@ import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
@ -33,7 +30,7 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
public static final String ROOT_TRUST = "0.8";
public static final float ROOT_TRUST = 0.8f;
public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
@ -81,7 +78,7 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
final Class<Entity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
final DataInfo dataInfo = getDataInfo(dedupConf);
final EntityDataInfo dataInfo = getDataInfo(dedupConf);
DedupRecordFactory
.createDedupRecord(spark, dataInfo, mergeRelPath, entityPath, clazz)
.write()
@ -91,8 +88,8 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
}
}
private static DataInfo getDataInfo(DedupConfig dedupConf) {
DataInfo info = new DataInfo();
private static EntityDataInfo getDataInfo(DedupConfig dedupConf) {
EntityDataInfo info = new EntityDataInfo();
info.setDeletedbyinference(false);
info.setInferred(true);
info.setInvisible(false);
@ -102,7 +99,6 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
provenance.setClassid(PROVENANCE_DEDUP);
provenance.setClassname(PROVENANCE_DEDUP);
provenance.setSchemeid(DNET_PROVENANCE_ACTIONS);
provenance.setSchemename(DNET_PROVENANCE_ACTIONS);
info.setProvenanceaction(provenance);
return info;
}

View File

@ -1,13 +1,22 @@
package eu.dnetlib.dhp.oa.dedup;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent;
import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@ -25,28 +34,15 @@ import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent;
import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP;
public class SparkCreateMergeRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class);
@ -97,7 +93,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
final String subEntity = dedupConf.getWf().getSubEntityValue();
final Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
final Class<Entity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
log.info("Creating mergerels for: '{}'", subEntity);
@ -127,12 +123,12 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
.rdd(),
Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
Dataset<Tuple2<String, OafEntity>> entities = spark
Dataset<Tuple2<String, Entity>> entities = spark
.read()
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.map(
(MapFunction<String, Tuple2<String, OafEntity>>) it -> {
OafEntity entity = OBJECT_MAPPER.readValue(it, clazz);
(MapFunction<String, Tuple2<String, Entity>>) it -> {
Entity entity = OBJECT_MAPPER.readValue(it, clazz);
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
@ -141,14 +137,14 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
.joinWith(entities, rawMergeRels.col("_2").equalTo(entities.col("_1")), "inner")
// <tmp_source,target>,<target,entity>
.map(
(MapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, OafEntity>>, Tuple2<String, OafEntity>>) value -> new Tuple2<>(
(MapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, Entity>>, Tuple2<String, Entity>>) value -> new Tuple2<>(
value._1()._1(), value._2()._2()),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)))
// <tmp_source,entity>
.groupByKey(
(MapFunction<Tuple2<String, OafEntity>, String>) Tuple2::_1, Encoders.STRING())
(MapFunction<Tuple2<String, Entity>, String>) Tuple2::_1, Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Tuple2<String, OafEntity>, ConnectedComponent>) this::generateID,
(MapGroupsFunction<String, Tuple2<String, Entity>, ConnectedComponent>) this::generateID,
Encoders.bean(ConnectedComponent.class))
// <root_id, list(target)>
.flatMap(
@ -160,7 +156,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
}
}
private <T extends OafEntity> ConnectedComponent generateID(String key, Iterator<Tuple2<String, T>> values) {
private <T extends Entity> ConnectedComponent generateID(String key, Iterator<Tuple2<String, T>> values) {
List<Identifier<T>> identifiers = Lists
.newArrayList(values)
@ -224,20 +220,20 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
r.setSubRelType(ModelConstants.DEDUP);
DataInfo info = new DataInfo();
info.setDeletedbyinference(false);
info.setInferred(true);
info.setInvisible(false);
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
Qualifier provenanceAction = new Qualifier();
provenanceAction.setClassid(PROVENANCE_DEDUP);
provenanceAction.setClassname(PROVENANCE_DEDUP);
provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS);
provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS);
info.setProvenanceaction(provenanceAction);
// TODO calculate the trust value based on the similarity score of the elements in the CC
r.setDataInfo(info);
r.setProvenance(Arrays.asList(OafMapperUtils.getProvenance(new KeyValue(), info)));
return r;
}

View File

@ -5,6 +5,8 @@ import java.io.IOException;
import java.util.Optional;
import java.util.Properties;
import eu.dnetlib.dhp.schema.oaf.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@ -19,9 +21,8 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
@ -165,10 +166,10 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
(MapFunction<Tuple2<Tuple2<String, Organization>, Tuple2<String, String>>, OrgSimRel>) r -> new OrgSimRel(
"",
r._1()._2().getOriginalId().get(0),
r._1()._2().getLegalname() != null ? r._1()._2().getLegalname().getValue() : "",
r._1()._2().getLegalshortname() != null ? r._1()._2().getLegalshortname().getValue() : "",
r._1()._2().getLegalname() != null ? r._1()._2().getLegalname() : "",
r._1()._2().getLegalshortname() != null ? r._1()._2().getLegalshortname() : "",
r._1()._2().getCountry() != null ? r._1()._2().getCountry().getClassid() : "",
r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "",
r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl() : "",
r._1()._2().getCollectedfrom().get(0).getValue(),
"",
structuredPropertyListToString(r._1()._2().getPid()),

View File

@ -6,6 +6,8 @@ import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import eu.dnetlib.dhp.schema.oaf.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -21,9 +23,7 @@ import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -218,10 +218,10 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
return new OrgSimRel(
r._1()._1(),
o.getOriginalId().get(0),
Optional.ofNullable(o.getLegalname()).map(Field::getValue).orElse(""),
Optional.ofNullable(o.getLegalshortname()).map(Field::getValue).orElse(""),
Optional.ofNullable(o.getLegalname()).orElse(""),
Optional.ofNullable(o.getLegalshortname()).orElse(""),
Optional.ofNullable(o.getCountry()).map(Qualifier::getClassid).orElse(""),
Optional.ofNullable(o.getWebsiteurl()).map(Field::getValue).orElse(""),
Optional.ofNullable(o.getWebsiteurl()).orElse(""),
Optional
.ofNullable(o.getCollectedfrom())
.map(c -> Optional.ofNullable(c.get(0)).map(KeyValue::getValue).orElse(""))
@ -309,10 +309,10 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
(MapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, Organization>>, OrgSimRel>) r -> new OrgSimRel(
r._1()._1(),
r._2()._2().getOriginalId().get(0),
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname() : "",
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname() : "",
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl() : "",
r._2()._2().getCollectedfrom().get(0).getValue(),
GROUP_PREFIX + r._1()._1(),
structuredPropertyListToString(r._2()._2().getPid()),

View File

@ -1,10 +1,12 @@
package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col;
import java.util.Objects;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
@ -13,17 +15,13 @@ import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
import scala.Tuple3;
import java.util.Objects;
import static org.apache.spark.sql.functions.col;
public class SparkPropagateRelation extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkPropagateRelation.class);
@ -186,11 +184,6 @@ public class SparkPropagateRelation extends AbstractSparkAction {
String newSource = value._1()._2() != null ? value._1()._2()._2() : null;
String newTarget = value._2() != null ? value._2()._2() : null;
if (r.getDataInfo() == null) {
r.setDataInfo(new DataInfo());
}
r.getDataInfo().setDeletedbyinference(false);
if (newSource != null)
r.setSource(newSource);
@ -202,13 +195,18 @@ public class SparkPropagateRelation extends AbstractSparkAction {
}
private static MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, String>>, Relation> getDeletedFn() {
//TODO the model does not include anymore the possibility to mark relations as deleted. We should therefore
//TODO delete them for good in this spark action.
return value -> {
if (value._2() != null) {
Relation r = value._1()._2();
/*
if (r.getDataInfo() == null) {
r.setDataInfo(new DataInfo());
}
r.getDataInfo().setDeletedbyinference(true);
*/
return r;
}
return value._1()._2();

View File

@ -4,6 +4,9 @@ package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.Map;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -24,12 +27,7 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
@ -146,13 +144,13 @@ public class SparkUpdateEntity extends AbstractSparkAction {
return result;
}
private static <T extends OafEntity> String updateDeletedByInference(
private static <T extends Entity> String updateDeletedByInference(
final String json, final Class<T> clazz) {
try {
Oaf entity = OBJECT_MAPPER.readValue(json, clazz);
if (entity.getDataInfo() == null)
entity.setDataInfo(new DataInfo());
entity.getDataInfo().setDeletedbyinference(true);
if (((Entity) entity).getDataInfo() == null)
((Entity) entity).setDataInfo(new EntityDataInfo());
((Entity) entity).getDataInfo().setDeletedbyinference(true);
return OBJECT_MAPPER.writeValueAsString(entity);
} catch (IOException e) {
throw new RuntimeException("Unable to convert json", e);

View File

@ -1,26 +1,23 @@
package eu.dnetlib.dhp.oa.dedup.model;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.oa.dedup.DatePicker;
import eu.dnetlib.dhp.oa.dedup.IdentifierComparator;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.utils.PidComparator;
import eu.dnetlib.dhp.schema.oaf.Entity;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import org.apache.commons.lang3.StringUtils;
public class Identifier<T extends OafEntity> implements Serializable, Comparable<Identifier<T>> {
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Objects;
public class Identifier<T extends Entity> implements Serializable, Comparable<Identifier<T>> {
public static final String DATE_FORMAT = "yyyy-MM-dd";
public static final String BASE_DATE = "2000-01-01";
@ -30,7 +27,7 @@ public class Identifier<T extends OafEntity> implements Serializable, Comparable
// cached date value
private Date date = null;
public static <T extends OafEntity> Identifier<T> newInstance(T entity) {
public static <T extends Entity> Identifier<T> newInstance(T entity) {
return new Identifier<>(entity);
}
@ -54,7 +51,7 @@ public class Identifier<T extends OafEntity> implements Serializable, Comparable
if (ModelSupport.isSubClass(getEntity(), Result.class)) {
Result result = (Result) getEntity();
if (isWellformed(result.getDateofacceptance())) {
sDate = result.getDateofacceptance().getValue();
sDate = result.getDateofacceptance();
}
}
try {
@ -67,9 +64,9 @@ public class Identifier<T extends OafEntity> implements Serializable, Comparable
}
}
private static boolean isWellformed(Field<String> date) {
return date != null && StringUtils.isNotBlank(date.getValue())
&& date.getValue().matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date.getValue());
private static boolean isWellformed(String date) {
return StringUtils.isNotBlank(date)
&& date.matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date);
}
public List<KeyValue> getCollectedFrom() {

View File

@ -20,7 +20,7 @@ class DatePickerTest {
dates.add("2016-06-16T12:00:00Z");
dates.add("2020-01-01T12:00:00Z");
dates.add("2020-10-01T12:00:00Z");
assertEquals("2020-10-01", DatePicker.pick(dates).getValue());
assertEquals("2020-10-01", DatePicker.pick(dates));
}
@Test
@ -29,7 +29,7 @@ class DatePickerTest {
dates.add("2016-06-16");
dates.add("2020-01-01");
dates.add("2020-10-01");
assertEquals("2020-10-01", DatePicker.pick(dates).getValue());
assertEquals("2020-10-01", DatePicker.pick(dates));
}
@Test
@ -38,7 +38,7 @@ class DatePickerTest {
dates.add("2016-02-01");
dates.add("2016-02-01");
dates.add("2020-10-01");
assertEquals("2016-02-01", DatePicker.pick(dates).getValue());
assertEquals("2016-02-01", DatePicker.pick(dates));
}
}

View File

@ -30,7 +30,7 @@ class EntityMergerTest implements Serializable {
private List<Tuple2<String, Publication>> publications5;
private String testEntityBasePath;
private DataInfo dataInfo;
private EntityDataInfo dataInfo;
private final String dedupId = "00|dedup_id::1";
private Publication pub_top;
@ -119,7 +119,7 @@ class EntityMergerTest implements Serializable {
assertEquals(dataInfo, pub_merged.getDataInfo());
// verify datepicker
assertEquals("2018-09-30", pub_merged.getDateofacceptance().getValue());
assertEquals("2018-09-30", pub_merged.getDateofacceptance());
// verify authors
assertEquals(13, pub_merged.getAuthor().size());
@ -185,9 +185,9 @@ class EntityMergerTest implements Serializable {
assertEquals(dedupId, pub_merged.getId());
}
public DataInfo setDI() {
DataInfo dataInfo = new DataInfo();
dataInfo.setTrust("0.9");
public EntityDataInfo setDI() {
EntityDataInfo dataInfo = new EntityDataInfo();
dataInfo.setTrust(0.9f);
dataInfo.setDeletedbyinference(false);
dataInfo.setInferenceprovenance("testing");
dataInfo.setInferred(true);
@ -196,10 +196,10 @@ class EntityMergerTest implements Serializable {
public Publication getTopPub(List<Tuple2<String, Publication>> publications) {
Double maxTrust = 0.0;
Float maxTrust = 0.0f;
Publication maxPub = new Publication();
for (Tuple2<String, Publication> publication : publications) {
Double pubTrust = Double.parseDouble(publication._2().getDataInfo().getTrust());
Float pubTrust = publication._2().getDataInfo().getTrust();
if (pubTrust > maxTrust) {
maxTrust = pubTrust;
maxPub = publication._2();

View File

@ -12,6 +12,7 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
@ -88,7 +89,7 @@ public class IdGeneratorTest {
assertEquals("20|openorgs____::599c15a70fcb03be6ba08f75f14d6076", id1);
}
protected static <T extends OafEntity> List<Identifier<T>> createBestIds(String path, Class<T> clazz) {
protected static <T extends Entity> List<Identifier<T>> createBestIds(String path, Class<T> clazz) {
final Stream<Identifier<T>> ids = readSample(path, clazz)
.stream()
.map(Tuple2::_2)
@ -120,10 +121,7 @@ public class IdGeneratorTest {
}
public static StructuredProperty pid(String pid, String classid, String classname) {
return OafMapperUtils.structuredProperty(pid, classid, classname, "", "", new DataInfo());
return OafMapperUtils.structuredProperty(pid, classid, classname, ModelConstants.DNET_PID_TYPES);
}
public static List<KeyValue> keyValue(String key, String value) {
return Lists.newArrayList(OafMapperUtils.keyValue(key, value));
}
}

View File

@ -522,7 +522,7 @@ public class SparkDedupTest implements Serializable {
assertEquals(crossref_duplicate.getJournal().getName(), root.getJournal().getName());
assertEquals(crossref_duplicate.getJournal().getIssnPrinted(), root.getJournal().getIssnPrinted());
assertEquals(crossref_duplicate.getPublisher().getValue(), root.getPublisher().getValue());
assertEquals(crossref_duplicate.getPublisher().getName(), root.getPublisher().getName());
Set<String> rootPids = root
.getPid()

View File

@ -253,7 +253,7 @@ public class SparkPublicationRootsTest implements Serializable {
assertEquals(crossref_duplicate.getJournal().getName(), root.getJournal().getName());
assertEquals(crossref_duplicate.getJournal().getIssnPrinted(), root.getJournal().getIssnPrinted());
assertEquals(crossref_duplicate.getPublisher().getValue(), root.getPublisher().getValue());
assertEquals(crossref_duplicate.getPublisher().getName(), root.getPublisher().getName());
Set<String> rootPids = root
.getPid()
@ -300,7 +300,7 @@ public class SparkPublicationRootsTest implements Serializable {
assertEquals(crossref_duplicate.getJournal().getIssnOnline(), root.getJournal().getIssnOnline());
assertEquals(crossref_duplicate.getJournal().getVol(), root.getJournal().getVol());
assertEquals(crossref_duplicate.getPublisher().getValue(), root.getPublisher().getValue());
assertEquals(crossref_duplicate.getPublisher().getName(), root.getPublisher().getName());
Set<String> dups_cf = pubs
.collectAsList()
@ -328,7 +328,7 @@ public class SparkPublicationRootsTest implements Serializable {
.filter("id = '50|od_______166::31ca734cc22181b704c4aa8fd050062a'")
.first();
assertEquals(pivot_duplicate.getPublisher().getValue(), root.getPublisher().getValue());
assertEquals(pivot_duplicate.getPublisher().getName(), root.getPublisher().getName());
Set<String> dups_cf = pubs
.collectAsList()
@ -376,7 +376,7 @@ public class SparkPublicationRootsTest implements Serializable {
.textFile(graphOutputPath + "/publication")
.map(asEntity(Publication.class), Encoders.bean(Publication.class))
.filter("datainfo.deletedbyinference == true")
.map((MapFunction<Publication, String>) OafEntity::getId, Encoders.STRING())
.map((MapFunction<Publication, String>) Entity::getId, Encoders.STRING())
.distinct()
.count();
@ -390,7 +390,7 @@ public class SparkPublicationRootsTest implements Serializable {
.getResourceAsStream(path));
}
private static <T extends OafEntity> MapFunction<String, T> asEntity(Class<T> clazz) {
private static <T extends Entity> MapFunction<String, T> asEntity(Class<T> clazz) {
return value -> MAPPER.readValue(value, clazz);
}

View File

@ -195,10 +195,10 @@ public class SparkPublicationRootsTest2 implements Serializable {
.collectAsList()
.get(0);
assertEquals(crossref_duplicate.getDateofacceptance().getValue(), root.getDateofacceptance().getValue());
assertEquals(crossref_duplicate.getDateofacceptance(), root.getDateofacceptance());
assertEquals(crossref_duplicate.getJournal().getName(), root.getJournal().getName());
assertEquals(crossref_duplicate.getJournal().getIssnPrinted(), root.getJournal().getIssnPrinted());
assertEquals(crossref_duplicate.getPublisher().getValue(), root.getPublisher().getValue());
assertEquals(crossref_duplicate.getPublisher().getName(), root.getPublisher().getName());
Set<String> rootPids = root
.getPid()
@ -238,7 +238,7 @@ public class SparkPublicationRootsTest2 implements Serializable {
.getResourceAsStream(path));
}
private static <T extends OafEntity> MapFunction<String, T> asEntity(Class<T> clazz) {
private static <T extends Entity> MapFunction<String, T> asEntity(Class<T> clazz) {
return value -> MAPPER.readValue(value, clazz);
}

View File

@ -7,6 +7,7 @@ import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -23,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
@ -83,7 +83,7 @@ public class MergeGraphTableSparkJob {
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
Class<? extends Entity> entityClazz = (Class<? extends Entity>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");