forked from D-Net/dnet-hadoop
WIP: graph cleaner implementation
This commit is contained in:
parent
ba8a024af9
commit
97b1c4057c
|
@ -1,86 +1,70 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.clean;
|
package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
|
|
||||||
import java.lang.reflect.Field;
|
import com.google.common.collect.Maps;
|
||||||
import java.util.Arrays;
|
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||||
import java.util.LinkedList;
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
import java.util.List;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
public class CleaningRule<T extends Oaf> implements MapFunction<T, T> {
|
public class CleaningRule<T extends Oaf> implements MapFunction<T, T> {
|
||||||
|
|
||||||
private VocabularyGroup vocabularies;
|
private VocabularyGroup vocabularies;
|
||||||
|
|
||||||
|
private Map<Class, Function<Object, Object>> mapping = Maps.newHashMap();
|
||||||
|
|
||||||
|
|
||||||
public CleaningRule(VocabularyGroup vocabularies) {
|
public CleaningRule(VocabularyGroup vocabularies) {
|
||||||
this.vocabularies = vocabularies;
|
this.vocabularies = vocabularies;
|
||||||
|
|
||||||
|
mapping.put(Qualifier.class, o -> patchQualifier(o));
|
||||||
|
mapping.put(StructuredProperty.class, o -> patchSp(o));
|
||||||
|
mapping.put(Field.class, o -> patchStringField(o));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public T call(T value) throws Exception {
|
public T call(T value) throws Exception {
|
||||||
|
|
||||||
doClean(value);
|
OafNavigator.apply(value, mapping);
|
||||||
|
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doClean(Object o) {
|
private Object patchQualifier(Object o) {
|
||||||
if (Objects.isNull(o)) {
|
Qualifier q = (Qualifier) o;
|
||||||
return;
|
if (vocabularies.vocabularyExists(q.getSchemeid())) {
|
||||||
}
|
return vocabularies.lookup(q.getSchemeid(), q.getClassid());
|
||||||
|
|
||||||
if (o instanceof Iterable) {
|
|
||||||
for (Object oi : (Iterable) o) {
|
|
||||||
doClean(oi);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
|
|
||||||
Class clazz = o.getClass();
|
|
||||||
|
|
||||||
if (clazz.isPrimitive()
|
|
||||||
|| o instanceof Integer
|
|
||||||
|| o instanceof Double
|
|
||||||
|| o instanceof Float
|
|
||||||
|| o instanceof Long
|
|
||||||
|| o instanceof Boolean
|
|
||||||
|| o instanceof String) {
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
for (Field field : getAllFields(new LinkedList<>(), clazz)) {
|
|
||||||
field.setAccessible(true);
|
|
||||||
Object value = field.get(o);
|
|
||||||
if (value instanceof Qualifier) {
|
|
||||||
Qualifier q = (Qualifier) value;
|
|
||||||
if (vocabularies.vocabularyExists(q.getSchemeid())) {
|
|
||||||
field.set(o, vocabularies.lookup(q.getSchemeid(), q.getClassid()));
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
doClean(value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IllegalAccessException | IllegalArgumentException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return o;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Field> getAllFields(List<Field> fields, Class<?> clazz) {
|
private Object patchSp(Object o) {
|
||||||
fields.addAll(Arrays.asList(clazz.getDeclaredFields()));
|
StructuredProperty sp = (StructuredProperty) o;
|
||||||
|
if (StringUtils.isBlank(sp.getValue())) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return o;
|
||||||
|
}
|
||||||
|
|
||||||
final Class<?> superclass = clazz.getSuperclass();
|
private Object patchStringField(Object o) {
|
||||||
if (Objects.nonNull(superclass) && superclass.getPackage().equals(Oaf.class.getPackage())) {
|
Field f = (Field) o;
|
||||||
getAllFields(fields, superclass);
|
try {
|
||||||
|
if (StringUtils.isBlank((String) f.getValue())) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
// ignored on purpose
|
||||||
}
|
}
|
||||||
|
|
||||||
return fields;
|
return o;
|
||||||
}
|
}
|
||||||
|
|
||||||
public VocabularyGroup getVocabularies() {
|
public VocabularyGroup getVocabularies() {
|
||||||
|
|
|
@ -1,4 +1,151 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.clean;
|
package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import java.beans.BeanInfo;
|
||||||
|
import java.beans.IntrospectionException;
|
||||||
|
import java.beans.Introspector;
|
||||||
|
import java.beans.PropertyDescriptor;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
public class OafNavigator {
|
public class OafNavigator {
|
||||||
|
|
||||||
|
public static <E extends Oaf> E apply(E oaf, Map<Class, Function<Object, Object>> mapping) {
|
||||||
|
reflect(oaf, mapping);
|
||||||
|
return oaf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void reflect(Object o, Map<Class, Function<Object, Object>> mapping) {
|
||||||
|
visit(o, mapping);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void visit(final Object thingy, Map<Class, Function<Object, Object>> mapping) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
final Class<?> clazz = thingy.getClass();
|
||||||
|
|
||||||
|
if (!isPrimitive(thingy) && clazz.getPackage().equals(Oaf.class.getPackage())) {
|
||||||
|
|
||||||
|
final BeanInfo beanInfo = Introspector.getBeanInfo(clazz);
|
||||||
|
|
||||||
|
for (final PropertyDescriptor descriptor : beanInfo.getPropertyDescriptors()) {
|
||||||
|
try {
|
||||||
|
final Object value = descriptor.getReadMethod().invoke(thingy);
|
||||||
|
|
||||||
|
if (value != null && !isPrimitive(value)) {
|
||||||
|
|
||||||
|
System.out.println("VISITING " + descriptor.getName() + " " + descriptor.getPropertyType());
|
||||||
|
|
||||||
|
if (Iterable.class.isAssignableFrom(descriptor.getPropertyType())) {
|
||||||
|
for(Object vi : (Iterable) value) {
|
||||||
|
|
||||||
|
visit(vi, mapping);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
|
||||||
|
if (mapping.keySet().contains(value.getClass())) {
|
||||||
|
final Object newValue = mapping.get(value.getClass()).apply(value);
|
||||||
|
System.out.println("PATCHING " + descriptor.getName()+ " " + descriptor.getPropertyType());
|
||||||
|
System.out.println("OLD VALUE " + getObjectMapper().writeValueAsString(value));
|
||||||
|
System.out.println("NEW VALUE " + getObjectMapper().writeValueAsString(newValue));
|
||||||
|
descriptor.getWriteMethod().invoke(newValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
visit(value, mapping);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (final IllegalArgumentException e) {
|
||||||
|
// handle this please
|
||||||
|
} catch (final IllegalAccessException e) {
|
||||||
|
// and this also
|
||||||
|
} catch (final InvocationTargetException e) {
|
||||||
|
// and this, too
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (final IntrospectionException e) {
|
||||||
|
// do something sensible here
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ObjectMapper getObjectMapper() {
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
return mapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void navigate(Object o, Map<Class, Function<Object, Object>> mapping) {
|
||||||
|
if (Objects.isNull(o) || isPrimitive(o)) {
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
for (Field field : getAllFields(o.getClass())) {
|
||||||
|
System.out.println(field.getName());
|
||||||
|
field.setAccessible(true);
|
||||||
|
Object value = field.get(o);
|
||||||
|
|
||||||
|
if (Objects.nonNull(value)) {
|
||||||
|
final Class<?> fieldType = field.getType();
|
||||||
|
if ((fieldType.isArray() && !fieldType.getComponentType().isPrimitive())) {
|
||||||
|
Object[] fs = (Object[]) value;
|
||||||
|
for (Object fi : fs) {
|
||||||
|
navigate(fi, mapping);
|
||||||
|
}
|
||||||
|
} if (Iterable.class.isAssignableFrom(fieldType)) {
|
||||||
|
Iterable fs = (Iterable) value;
|
||||||
|
for (Object fi : fs) {
|
||||||
|
navigate(fi, mapping);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (mapping.keySet().contains(value.getClass())) {
|
||||||
|
System.out.println("PATCHING " + field.getName());
|
||||||
|
field.set(o, mapping.get(value.getClass()).apply(value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (IllegalAccessException | IllegalArgumentException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isPrimitive(Object o) {
|
||||||
|
return o.getClass().isPrimitive()
|
||||||
|
|| o instanceof Class
|
||||||
|
|| o instanceof Integer
|
||||||
|
|| o instanceof Double
|
||||||
|
|| o instanceof Float
|
||||||
|
|| o instanceof Long
|
||||||
|
|| o instanceof Boolean
|
||||||
|
|| o instanceof String
|
||||||
|
|| o instanceof Byte;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Field> getAllFields(Class<?> clazz) {
|
||||||
|
return getAllFields(new LinkedList<>(), clazz);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Field> getAllFields(List<Field> fields, Class<?> clazz) {
|
||||||
|
fields.addAll(Arrays.asList(clazz.getDeclaredFields()));
|
||||||
|
|
||||||
|
final Class<?> superclass = clazz.getSuperclass();
|
||||||
|
if (Objects.nonNull(superclass) && superclass.getPackage().equals(Oaf.class.getPackage())) {
|
||||||
|
getAllFields(fields, superclass);
|
||||||
|
}
|
||||||
|
|
||||||
|
return fields;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,7 +105,10 @@ public class VocabularyGroup implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Qualifier getTermAsQualifier(final String vocId, final String id) {
|
public Qualifier getTermAsQualifier(final String vocId, final String id) {
|
||||||
return vocs.get(vocId.toLowerCase()).getTermAsQualifier(id);
|
if (vocabularyExists(vocId)) {
|
||||||
|
return vocs.get(vocId.toLowerCase()).getTermAsQualifier(id);
|
||||||
|
}
|
||||||
|
return OafMapperUtils.qualifier(id, id, "", "");
|
||||||
}
|
}
|
||||||
|
|
||||||
public Qualifier getSynonymAsQualifier(final String vocId, final String syn) {
|
public Qualifier getSynonymAsQualifier(final String vocId, final String syn) {
|
||||||
|
|
|
@ -7,6 +7,7 @@ import static org.mockito.Mockito.lenient;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
@ -15,6 +16,7 @@ import org.apache.commons.io.IOUtils;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.junit.platform.commons.util.StringUtils;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
|
@ -82,6 +84,11 @@ public class CleaningRuleTest {
|
||||||
// TODO add more assertions to verity the cleaned values
|
// TODO add more assertions to verity the cleaned values
|
||||||
System.out.println(MAPPER.writeValueAsString(p_out));
|
System.out.println(MAPPER.writeValueAsString(p_out));
|
||||||
|
|
||||||
|
assertTrue(
|
||||||
|
p_out
|
||||||
|
.getPid()
|
||||||
|
.stream()
|
||||||
|
.allMatch(sp -> StringUtils.isNotBlank(sp.getValue())));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Stream<Qualifier> getAuthorPidTypes(Publication pub) {
|
private Stream<Qualifier> getAuthorPidTypes(Publication pub) {
|
||||||
|
|
|
@ -380,6 +380,28 @@
|
||||||
"schemename": "dnet:pid_types"
|
"schemename": "dnet:pid_types"
|
||||||
},
|
},
|
||||||
"value": "10.1007/s109090161569x"
|
"value": "10.1007/s109090161569x"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"dataInfo": {
|
||||||
|
"deletedbyinference": false,
|
||||||
|
"inferenceprovenance": "",
|
||||||
|
"inferred": false,
|
||||||
|
"invisible": false,
|
||||||
|
"provenanceaction": {
|
||||||
|
"classid": "sysimport:crosswalk:datasetarchive",
|
||||||
|
"classname": "sysimport:crosswalk:datasetarchive",
|
||||||
|
"schemeid": "dnet:provenanceActions",
|
||||||
|
"schemename": "dnet:provenanceActions"
|
||||||
|
},
|
||||||
|
"trust": "0.9"
|
||||||
|
},
|
||||||
|
"qualifier": {
|
||||||
|
"classid": "doi",
|
||||||
|
"classname": "doi",
|
||||||
|
"schemeid": "dnet:pid_types",
|
||||||
|
"schemename": "dnet:pid_types"
|
||||||
|
},
|
||||||
|
"value": ""
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"relevantdate": [
|
"relevantdate": [
|
||||||
|
|
Loading…
Reference in New Issue