forked from D-Net/dnet-hadoop
added simpler, AtomicAction replacement, based on the dhp.Oaf model
This commit is contained in:
parent
abe8fb69a2
commit
1850a02ae4
|
@ -30,7 +30,12 @@
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
<artifactId>jackson-databind</artifactId>
|
<artifactId>jackson-databind</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.guava</groupId>
|
||||||
|
<artifactId>guava</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>junit</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
package eu.dnetlib.dhp.schema.action;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
@JsonDeserialize(using = AtomicActionDeserializer.class)
|
||||||
|
public class AtomicAction<T extends Oaf> implements Serializable {
|
||||||
|
|
||||||
|
private Class<T> clazz;
|
||||||
|
|
||||||
|
private T payload;
|
||||||
|
|
||||||
|
public AtomicAction() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public AtomicAction(Class<T> clazz, T payload) {
|
||||||
|
this.clazz = clazz;
|
||||||
|
this.payload = payload;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Class<T> getClazz() {
|
||||||
|
return clazz;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClazz(Class<T> clazz) {
|
||||||
|
this.clazz = clazz;
|
||||||
|
}
|
||||||
|
|
||||||
|
public T getPayload() {
|
||||||
|
return payload;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPayload(T payload) {
|
||||||
|
this.payload = payload;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
package eu.dnetlib.dhp.schema.action;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonParser;
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||||
|
import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class AtomicActionDeserializer extends JsonDeserializer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
|
||||||
|
JsonNode node = jp.getCodec().readTree(jp);
|
||||||
|
String classTag = node.get("clazz").asText();
|
||||||
|
JsonNode payload = node.get("payload");
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
|
try {
|
||||||
|
final Class<?> clazz = Class.forName(classTag);
|
||||||
|
return new AtomicAction(clazz, (Oaf) mapper.readValue(payload.toString(), clazz));
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,10 +2,11 @@ package eu.dnetlib.dhp.schema.oaf;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
|
|
||||||
public class Relation extends Oaf {
|
public class Relation extends Oaf {
|
||||||
|
|
||||||
|
@ -70,14 +71,34 @@ public class Relation extends Oaf {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void mergeFrom(final Relation r) {
|
public void mergeFrom(final Relation r) {
|
||||||
Assert.assertEquals("source ids must be equal", getSource(), r.getSource());
|
|
||||||
Assert.assertEquals("target ids must be equal", getTarget(), r.getTarget());
|
checkArgument(Objects.equals(getSource(), r.getSource()),"source ids must be equal");
|
||||||
Assert.assertEquals("relType(s) must be equal", getRelType(), r.getRelType());
|
checkArgument(Objects.equals(getTarget(), r.getTarget()),"target ids must be equal");
|
||||||
Assert.assertEquals("subRelType(s) must be equal", getSubRelType(), r.getSubRelType());
|
checkArgument(Objects.equals(getRelType(), r.getRelType()),"relType(s) must be equal");
|
||||||
Assert.assertEquals("relClass(es) must be equal", getRelClass(), r.getRelClass());
|
checkArgument(Objects.equals(getSubRelType(), r.getSubRelType()),"subRelType(s) must be equal");
|
||||||
|
checkArgument(Objects.equals(getRelClass(), r.getRelClass()),"relClass(es) must be equal");
|
||||||
|
|
||||||
setCollectedFrom(Stream.concat(getCollectedFrom().stream(), r.getCollectedFrom().stream())
|
setCollectedFrom(Stream.concat(getCollectedFrom().stream(), r.getCollectedFrom().stream())
|
||||||
.distinct() // relies on KeyValue.equals
|
.distinct() // relies on KeyValue.equals
|
||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
Relation relation = (Relation) o;
|
||||||
|
return relType.equals(relation.relType) &&
|
||||||
|
subRelType.equals(relation.subRelType) &&
|
||||||
|
relClass.equals(relation.relClass) &&
|
||||||
|
source.equals(relation.source) &&
|
||||||
|
target.equals(relation.target) &&
|
||||||
|
Objects.equals(collectedFrom, relation.collectedFrom);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(relType, subRelType, relClass, source, target, collectedFrom);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
package eu.dnetlib.dhp.schema.action;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class AtomicActionTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void serializationTest() throws IOException {
|
||||||
|
|
||||||
|
Relation rel = new Relation();
|
||||||
|
rel.setSource("1");
|
||||||
|
rel.setTarget("2");
|
||||||
|
rel.setRelType("resultResult");
|
||||||
|
rel.setSubRelType("dedup");
|
||||||
|
rel.setRelClass("merges");
|
||||||
|
|
||||||
|
AtomicAction aa1 = new AtomicAction(Relation.class, rel);
|
||||||
|
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
String json = mapper.writeValueAsString(aa1);
|
||||||
|
|
||||||
|
Assert.assertTrue(StringUtils.isNotBlank(json));
|
||||||
|
|
||||||
|
AtomicAction aa2 = mapper.readValue(json, AtomicAction.class);
|
||||||
|
|
||||||
|
Assert.assertEquals(aa1.getClazz(), aa2.getClazz());
|
||||||
|
Assert.assertEquals(aa1.getPayload(), aa2.getPayload());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
8
pom.xml
8
pom.xml
|
@ -148,6 +148,13 @@
|
||||||
<version>${dhp.commons.lang.version}</version>
|
<version>${dhp.commons.lang.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.guava</groupId>
|
||||||
|
<artifactId>guava</artifactId>
|
||||||
|
<version>${dhp.guava.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>commons-codec</groupId>
|
<groupId>commons-codec</groupId>
|
||||||
<artifactId>commons-codec</artifactId>
|
<artifactId>commons-codec</artifactId>
|
||||||
|
@ -496,6 +503,7 @@
|
||||||
<dhp.spark.version>2.4.0.cloudera2</dhp.spark.version>
|
<dhp.spark.version>2.4.0.cloudera2</dhp.spark.version>
|
||||||
<dhp.jackson.version>2.9.6</dhp.jackson.version>
|
<dhp.jackson.version>2.9.6</dhp.jackson.version>
|
||||||
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
||||||
|
<dhp.guava.version>28.2-jre</dhp.guava.version>
|
||||||
<scala.version>2.11.12</scala.version>
|
<scala.version>2.11.12</scala.version>
|
||||||
<junit.version>4.12</junit.version>
|
<junit.version>4.12</junit.version>
|
||||||
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
||||||
|
|
Loading…
Reference in New Issue