[OpenOrgsWf] trivial refactoring

This commit is contained in:
Claudio Atzori 2021-04-01 10:30:51 +02:00
parent 7941d7be29
commit 70e49ed53c
11 changed files with 71 additions and 194 deletions

View File

@ -6,10 +6,12 @@ import java.io.Serializable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
@ -23,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -117,4 +120,26 @@ abstract class AbstractSparkAction implements Serializable {
.map(p -> p.getValue() + TYPE_VALUE_SEPARATOR + p.getQualifier().getClassid())
.collect(Collectors.joining(SP_SEPARATOR));
}
protected static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
protected boolean isOpenorgs(Relation rel) {
return Optional
.ofNullable(rel.getCollectedfrom())
.map(
c -> c
.stream()
.filter(kv -> kv.getValue().equals(ModelConstants.OPENORGS_NAME))
.findFirst()
.isPresent())
.orElse(false);
}
}

View File

@ -2,14 +2,8 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.text.Normalizer;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
@ -18,21 +12,19 @@ import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import com.google.common.collect.Sets;
import com.wcohen.ss.JaroWinkler;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.model.Person;
import scala.Tuple2;
public class DedupUtility {
public static final String OPENORGS_ID_PREFIX = "openorgs____";
public static final String CORDA_ID_PREFIX = "corda";
public static Map<String, LongAccumulator> constructAccumulator(
final DedupConfig dedupConf, final SparkContext context) {
@ -134,4 +126,24 @@ public class DedupUtility {
dedupConfig.getWf().setConfigurationId(actionSetId);
return dedupConfig;
}
public static int compareOpenOrgIds(String o1, String o2) {
if (o1.contains(OPENORGS_ID_PREFIX) && o2.contains(OPENORGS_ID_PREFIX))
return o1.compareTo(o2);
if (o1.contains(CORDA_ID_PREFIX) && o2.contains(CORDA_ID_PREFIX))
return o1.compareTo(o2);
if (o1.contains(OPENORGS_ID_PREFIX))
return -1;
if (o2.contains(OPENORGS_ID_PREFIX))
return 1;
if (o1.contains(CORDA_ID_PREFIX))
return -1;
if (o2.contains(CORDA_ID_PREFIX))
return 1;
return o1.compareTo(o2);
}
}

View File

@ -94,7 +94,7 @@ public class SparkCopyOpenorgs extends AbstractSparkAction {
log.info("Number of organization entities processed: {}", entities.count());
entities = entities.filter(entities.col("id").contains("openorgs____"));
entities = entities.filter(entities.col("id").contains(DedupUtility.OPENORGS_ID_PREFIX));
log.info("Number of Openorgs organization entities: {}", entities.count());

View File

@ -124,35 +124,10 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
.parquet(outputPath);
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
private boolean filterOpenorgsRels(Relation rel) {
if (rel.getRelClass().equals(ModelConstants.IS_SIMILAR_TO)
return rel.getRelClass().equals(ModelConstants.IS_SIMILAR_TO)
&& rel.getRelType().equals(ModelConstants.ORG_ORG_RELTYPE)
&& rel.getSubRelType().equals(ModelConstants.DEDUP))
return true;
return false;
}
private boolean isOpenorgs(Relation rel) {
if (rel.getCollectedfrom() != null) {
for (KeyValue k : rel.getCollectedfrom()) {
if (k.getValue() != null && k.getValue().equals(ModelConstants.OPENORGS_NAME)) {
return true;
}
}
}
return false;
&& rel.getSubRelType().equals(ModelConstants.DEDUP);
}
private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) {

View File

@ -81,34 +81,10 @@ public class SparkCopyOpenorgsSimRels extends AbstractSparkAction {
log.info("Copied " + rawRels.count() + " Similarity Relations");
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
private boolean filterOpenorgsRels(Relation rel) {
if (rel.getRelClass().equals(ModelConstants.IS_SIMILAR_TO)
return rel.getRelClass().equals(ModelConstants.IS_SIMILAR_TO)
&& rel.getRelType().equals(ModelConstants.ORG_ORG_RELTYPE)
&& rel.getSubRelType().equals(ModelConstants.DEDUP) && isOpenorgs(rel))
return true;
return false;
&& rel.getSubRelType().equals(ModelConstants.DEDUP) && isOpenorgs(rel);
}
private boolean isOpenorgs(Relation rel) {
if (rel.getCollectedfrom() != null) {
for (KeyValue k : rel.getCollectedfrom()) {
if (k.getValue() != null && k.getValue().equals(ModelConstants.OPENORGS_NAME)) {
return true;
}
}
}
return false;
}
}

View File

@ -86,25 +86,4 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction {
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
private boolean isOpenorgs(Relation rel) {
if (rel.getCollectedfrom() != null) {
for (KeyValue k : rel.getCollectedfrom()) {
if (k.getValue() != null && k.getValue().equals("OpenOrgs Database")) {
return true;
}
}
}
return false;
}
}

View File

@ -127,7 +127,7 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
.filter(r -> filterRels(r, "organization"))
// take the worst id of the diffrel: <other id, "diffRel">
.mapToPair(rel -> {
if (compareIds(rel.getSource(), rel.getTarget()) > 0)
if (DedupUtility.compareOpenOrgIds(rel.getSource(), rel.getTarget()) > 0)
return new Tuple2<>(rel.getSource(), "diffRel");
else
return new Tuple2<>(rel.getTarget(), "diffRel");
@ -200,35 +200,6 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
}
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
public static int compareIds(String o1, String o2) {
if (o1.contains("openorgs____") && o2.contains("openorgs____"))
return o1.compareTo(o2);
if (o1.contains("corda") && o2.contains("corda"))
return o1.compareTo(o2);
if (o1.contains("openorgs____"))
return -1;
if (o2.contains("openorgs____"))
return 1;
if (o1.contains("corda"))
return -1;
if (o2.contains("corda"))
return 1;
return o1.compareTo(o2);
}
private static boolean filterRels(Relation rel, String entityType) {
switch (entityType) {

View File

@ -14,7 +14,6 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,9 +23,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
@ -35,8 +32,6 @@ import scala.Tuple3;
public class SparkPrepareOrgRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkPrepareOrgRels.class);
public static final String OPENORGS_ID_PREFIX = "openorgs____";
public static final String CORDA_ID_PREFIX = "corda";
public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
@ -141,7 +136,7 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
.filter(r -> filterRels(r, "organization"))
// put the best id as source of the diffrel: <best id, other id>
.map(rel -> {
if (compareIds(rel.getSource(), rel.getTarget()) < 0)
if (DedupUtility.compareOpenOrgIds(rel.getSource(), rel.getTarget()) < 0)
return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel");
else
return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel");
@ -216,17 +211,20 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
.joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner")
.map(
(MapFunction<Tuple2<Tuple3<String, String, String>, Tuple2<String, Organization>>, OrgSimRel>) r -> {
final Organization o = r._2()._2();
return new OrgSimRel(
r._1()._1(),
r._2()._2().getOriginalId().get(0),
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
r._2()._2().getCollectedfrom().get(0).getValue(),
o.getOriginalId().get(0),
Optional.ofNullable(o.getLegalname()).map(Field::getValue).orElse(""),
Optional.ofNullable(o.getLegalshortname()).map(Field::getValue).orElse(""),
Optional.ofNullable(o.getCountry()).map(Qualifier::getClassid).orElse(""),
Optional.ofNullable(o.getWebsiteurl()).map(Field::getValue).orElse(""),
Optional
.ofNullable(o.getCollectedfrom())
.map(c -> Optional.ofNullable(c.get(0)).map(KeyValue::getValue).orElse(""))
.orElse(""),
r._1()._3(),
structuredPropertyListToString(r._2()._2().getPid()));
structuredPropertyListToString(o.getPid()));
},
Encoders.bean(OrgSimRel.class))
.map(
@ -245,28 +243,9 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
}
public static int compareIds(String o1, String o2) {
if (o1.contains(OPENORGS_ID_PREFIX) && o2.contains(OPENORGS_ID_PREFIX))
return o1.compareTo(o2);
if (o1.contains(CORDA_ID_PREFIX) && o2.contains(CORDA_ID_PREFIX))
return o1.compareTo(o2);
if (o1.contains(OPENORGS_ID_PREFIX))
return -1;
if (o2.contains(OPENORGS_ID_PREFIX))
return 1;
if (o1.contains(CORDA_ID_PREFIX))
return -1;
if (o2.contains(CORDA_ID_PREFIX))
return 1;
return o1.compareTo(o2);
}
// Sort IDs basing on the type. Priority: 1) openorgs, 2)corda, 3)alphabetic
public static List<String> sortIds(List<String> ids) {
ids.sort((o1, o2) -> compareIds(o1, o2));
ids.sort((o1, o2) -> DedupUtility.compareOpenOrgIds(o1, o2));
return ids;
}
@ -301,7 +280,7 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
for (String id1 : g._2()) {
for (String id2 : g._2()) {
if (!id1.equals(id2))
if (id1.contains(OPENORGS_ID_PREFIX) && !id2.contains("openorgsmesh"))
if (id1.contains(DedupUtility.OPENORGS_ID_PREFIX) && !id2.contains("openorgsmesh"))
rels.add(new Tuple2<>(id1, id2));
}
}
@ -340,13 +319,4 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
}

View File

@ -142,16 +142,6 @@ public class SparkPropagateRelation extends AbstractSparkAction {
StringUtils.isNotBlank(r.getRelClass());
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
private static String getId(Relation r, FieldType type) {
switch (type) {
case SOURCE:

View File

@ -111,7 +111,7 @@
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
<arg>--numPartitions</arg><arg>1000</arg>
</spark>
<ok to="CopyOpenorgsSimRels"/>
<error to="Kill"/>
@ -139,7 +139,7 @@
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
<arg>--numPartitions</arg><arg>1000</arg>
</spark>
<ok to="CreateMergeRels"/>
<error to="Kill"/>

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
@ -16,14 +15,7 @@ import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
@ -35,12 +27,8 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import jdk.nashorn.internal.ir.annotations.Ignore;
@ExtendWith(MockitoExtension.class)
public class SparkOpenorgsTest implements Serializable {
@ -224,13 +212,4 @@ public class SparkOpenorgsTest implements Serializable {
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
}