enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
3 changed files with 58 additions and 240 deletions
Showing only changes of commit bed65a1be6 - Show all commits

View File

@ -2,16 +2,13 @@
package eu.dnetlib.dhp.oa.graph.clean; package eu.dnetlib.dhp.oa.graph.clean;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
@ -20,54 +17,47 @@ public class CleaningRule<T extends Oaf> implements MapFunction<T, T> {
private VocabularyGroup vocabularies; private VocabularyGroup vocabularies;
private Map<Class, Function<Object, Object>> mapping = Maps.newHashMap(); private Map<Class, Consumer<Object>> mapping = Maps.newHashMap();
public CleaningRule(VocabularyGroup vocabularies) { public CleaningRule(VocabularyGroup vocabularies) {
this.vocabularies = vocabularies; this.vocabularies = vocabularies;
setMappings(vocabularies);
mapping.put(Qualifier.class, o -> patchQualifier(o));
mapping.put(StructuredProperty.class, o -> patchSp(o));
mapping.put(Field.class, o -> patchStringField(o));
} }
@Override @Override
public T call(T value) throws Exception { public T call(T value) throws Exception {
OafNavigator2.apply(value, mapping); OafNavigator.apply(value, mapping);
return value; return value;
} }
private Object patchQualifier(Object o) { /**
Qualifier q = (Qualifier) o; * Populates the mapping for the Oaf types subject to cleaning
if (vocabularies.vocabularyExists(q.getSchemeid())) { *
return vocabularies.lookup(q.getSchemeid(), q.getClassid()); * @param vocabularies
} */
return o; private void setMappings(VocabularyGroup vocabularies) {
} mapping.put(Qualifier.class, o -> {
Qualifier q = (Qualifier) o;
private Object patchSp(Object o) { if (vocabularies.vocabularyExists(q.getSchemeid())) {
StructuredProperty sp = (StructuredProperty) o; Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid());
if (StringUtils.isBlank(sp.getValue())) { q.setClassid(newValue.getClassid());
return null; q.setClassname(newValue.getClassname());
}
return o;
}
private Object patchStringField(Object o) {
Field f = (Field) o;
try {
if (StringUtils.isBlank((String) f.getValue())) {
return null;
} }
} catch (ClassCastException e) { });
// ignored on purpose mapping.put(StructuredProperty.class, o -> {
} StructuredProperty sp = (StructuredProperty) o;
// TODO implement a policy
return o; /*
* if (StringUtils.isBlank(sp.getValue())) { sp.setValue(null); sp.setQualifier(null); sp.setDataInfo(null);
* }
*/
});
} }
public VocabularyGroup getVocabularies() { public VocabularyGroup getVocabularies() {
return vocabularies; return vocabularies;
} }
} }

View File

@ -1,132 +1,56 @@
package eu.dnetlib.dhp.oa.graph.clean; package eu.dnetlib.dhp.oa.graph.clean;
import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.util.*; import java.util.*;
import java.util.function.Function; import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Oaf;
import scala.Tuple2;
public class OafNavigator { public class OafNavigator {
public static <E extends Oaf> E apply(E oaf, Map<Class, Function<Object, Object>> mapping) { public static <E extends Oaf> E apply(E oaf, Map<Class, Consumer<Object>> mapping) {
reflect(oaf, mapping); try {
navigate(oaf, mapping);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
return oaf; return oaf;
} }
public static void reflect(Object o, Map<Class, Function<Object, Object>> mapping) { private static void navigate(Object o, Map<Class, Consumer<Object>> mapping) throws IllegalAccessException {
visit(o, mapping); if (isPrimitive(o)) {
}
public static void visit(final Object thingy, Map<Class, Function<Object, Object>> mapping) {
try {
final Class<?> clazz = thingy.getClass();
if (!isPrimitive(thingy) && clazz.getPackage().equals(Oaf.class.getPackage())) {
final BeanInfo beanInfo = Introspector.getBeanInfo(clazz);
for (final PropertyDescriptor descriptor : beanInfo.getPropertyDescriptors()) {
try {
final Object value = descriptor.getReadMethod().invoke(thingy);
if (value != null && !isPrimitive(value)) {
System.out.println("VISITING " + descriptor.getName() + " " + descriptor.getPropertyType());
if (Iterable.class.isAssignableFrom(descriptor.getPropertyType())) {
for (Object vi : (Iterable) value) {
visit(vi, mapping);
}
} else {
if (mapping.keySet().contains(value.getClass())) {
final Object newValue = mapping.get(value.getClass()).apply(value);
System.out
.println(
"PATCHING " + descriptor.getName() + " " + descriptor.getPropertyType());
System.out.println("OLD VALUE " + getObjectMapper().writeValueAsString(value));
System.out.println("NEW VALUE " + getObjectMapper().writeValueAsString(newValue));
descriptor.getWriteMethod().invoke(newValue);
}
visit(value, mapping);
}
}
} catch (final IllegalArgumentException e) {
// handle this please
} catch (final IllegalAccessException e) {
// and this also
} catch (final InvocationTargetException e) {
// and this, too
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
} catch (final IntrospectionException e) {
// do something sensible here
}
}
private static ObjectMapper getObjectMapper() {
final ObjectMapper mapper = new ObjectMapper();
return mapper;
}
private static void navigate(Object o, Map<Class, Function<Object, Object>> mapping) {
if (Objects.isNull(o) || isPrimitive(o)) {
return; return;
} else if (isIterable(o.getClass())) {
for (final Object elem : (Iterable<?>) o) {
navigate(elem, mapping);
}
} else if (hasMapping(o, mapping)) {
mapping.get(o.getClass()).accept(o);
} else { } else {
try { for (final Field f : getAllFields(o.getClass())) {
for (Field field : getAllFields(o.getClass())) { f.setAccessible(true);
System.out.println(field.getName()); final Object val = f.get(o);
field.setAccessible(true); if (!isPrimitive(val) && hasMapping(val, mapping)) {
Object value = field.get(o); mapping.get(val.getClass()).accept(val);
} else {
if (Objects.nonNull(value)) { navigate(f.get(o), mapping);
final Class<?> fieldType = field.getType();
if ((fieldType.isArray() && !fieldType.getComponentType().isPrimitive())) {
Object[] fs = (Object[]) value;
for (Object fi : fs) {
navigate(fi, mapping);
}
}
if (Iterable.class.isAssignableFrom(fieldType)) {
Iterable fs = (Iterable) value;
for (Object fi : fs) {
navigate(fi, mapping);
}
} else {
if (mapping.keySet().contains(value.getClass())) {
System.out.println("PATCHING " + field.getName());
field.set(o, mapping.get(value.getClass()).apply(value));
}
}
}
} }
} catch (IllegalAccessException | IllegalArgumentException e) {
throw new RuntimeException(e);
} }
} }
} }
private static boolean hasMapping(Object o, Map<Class, Consumer<Object>> mapping) {
return mapping.containsKey(o.getClass());
}
private static boolean isIterable(final Class<?> cl) {
return Iterable.class.isAssignableFrom(cl);
}
private static boolean isPrimitive(Object o) { private static boolean isPrimitive(Object o) {
return o.getClass().isPrimitive() return Objects.isNull(o)
|| o.getClass().isPrimitive()
|| o instanceof Class || o instanceof Class
|| o instanceof Integer || o instanceof Integer
|| o instanceof Double || o instanceof Double

View File

@ -1,96 +0,0 @@
package eu.dnetlib.dhp.oa.graph.clean;
import java.lang.reflect.Field;
import java.util.*;
import java.util.function.Function;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Oaf;
public class OafNavigator2 {
public static <E extends Oaf> E apply(E oaf, Map<Class, Function<Object, Object>> mapping) {
navigate(oaf, mapping);
return oaf;
}
private static void navigate(Object o, Map<Class, Function<Object, Object>> mapping) {
if (Objects.isNull(o) || isPrimitive(o)) {
return;
} else {
try {
for (Field field : getAllFields(o.getClass())) {
//System.out.println("VISITING " + field.getName() + " in " + o.getClass());
field.setAccessible(true);
Object value = field.get(o);
if (Objects.nonNull(value)) {
final Class<?> fieldType = field.getType();
if ((fieldType.isArray() && !fieldType.getComponentType().isPrimitive())) {
Object[] fs = (Object[]) value;
for (Object fi : fs) {
navigate(fi, mapping);
}
}
if (Iterable.class.isAssignableFrom(fieldType)) {
Iterable fs = (Iterable) value;
for (Object fi : fs) {
navigate(fi, mapping);
}
} else {
final Function<Object, Object> cleaningFn = mapping.get(value.getClass());
if (Objects.nonNull(cleaningFn)) {
final Object newValue = cleaningFn.apply(value);
if (!Objects.equals(value, newValue)) {
//System.out.println("PATCHING " + field.getName() + " " + value.getClass());
//System.out.println("OLD VALUE " + getObjectMapper().writeValueAsString(value));
//System.out.println("NEW VALUE " + getObjectMapper().writeValueAsString(newValue));
field.set(o, newValue);
}
}
}
}
}
} catch (IllegalAccessException | IllegalArgumentException /*| JsonProcessingException*/ e) {
throw new RuntimeException(e);
}
}
}
private static ObjectMapper getObjectMapper() {
final ObjectMapper mapper = new ObjectMapper();
return mapper;
}
private static boolean isPrimitive(Object o) {
return o.getClass().isPrimitive()
|| o instanceof Class
|| o instanceof Integer
|| o instanceof Double
|| o instanceof Float
|| o instanceof Long
|| o instanceof Boolean
|| o instanceof String
|| o instanceof Byte;
}
private static List<Field> getAllFields(Class<?> clazz) {
return getAllFields(new LinkedList<>(), clazz);
}
private static List<Field> getAllFields(List<Field> fields, Class<?> clazz) {
fields.addAll(Arrays.asList(clazz.getDeclaredFields()));
final Class<?> superclass = clazz.getSuperclass();
if (Objects.nonNull(superclass) && superclass.getPackage().equals(Oaf.class.getPackage())) {
getAllFields(fields, superclass);
}
return fields;
}
}