minor changes and implementation of the create connected components action

This commit is contained in:
miconis 2020-03-19 15:01:07 +01:00
parent 679b5869e5
commit 4e82a24af2
8 changed files with 231 additions and 145 deletions

View File

@ -4,6 +4,9 @@ import com.google.common.collect.Sets;
import com.wcohen.ss.JaroWinkler;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
import eu.dnetlib.pace.config.DedupConfig;
@ -20,9 +23,14 @@ import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import scala.Tuple2;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
@ -54,38 +62,6 @@ public class DedupUtility {
return accumulators;
}
public static JavaRDD<String> loadDataFromHDFS(String path, JavaSparkContext context) {
return context.textFile(path);
}
public static void deleteIfExists(String path) throws IOException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
if (fileSystem.exists(new Path(path))) {
fileSystem.delete(new Path(path), true);
}
}
public static DedupConfig loadConfigFromHDFS(String path) throws IOException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
FSDataInputStream inputStream = new FSDataInputStream(fileSystem.open(new Path(path)));
return DedupConfig.load(IOUtils.toString(inputStream, StandardCharsets.UTF_8.name()));
}
static <T> String readFromClasspath(final String filename, final Class<T> clazz) {
final StringWriter sw = new StringWriter();
try {
IOUtils.copy(clazz.getResourceAsStream(filename), sw);
return sw.toString();
} catch (final IOException e) {
throw new RuntimeException("cannot load resource from classpath: " + filename);
}
}
static Set<String> getGroupingKeys(DedupConfig conf, MapDocument doc) {
return Sets.newHashSet(BlacklistAwareClusteringCombiner.filterAndCombine(doc, conf));
}
@ -150,12 +126,12 @@ public class DedupUtility {
return String.format("%s/%s", basePath, entityType);
}
public static String createSimRelPath(final String basePath, final String entityType) {
return String.format("%s/%s_simRel", basePath, entityType);
public static String createSimRelPath(final String basePath, final String actionSetId,final String entityType) {
return String.format("%s/%s/%s_simrel", basePath, actionSetId, entityType);
}
public static String createMergeRelPath(final String basePath, final String entityType) {
return String.format("%s/%s_mergeRel", basePath, entityType);
public static String createMergeRelPath(final String basePath, final String actionSetId, final String entityType) {
return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType);
}
private static Double sim(Author a, Author b) {
@ -216,4 +192,37 @@ public class DedupUtility {
return false;
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
}
public static List<DedupConfig> getConfigurations(String isLookUpUrl, String orchestrator) throws ISLookUpException, DocumentException {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", orchestrator);
String orchestratorProfile = isLookUpService.getResourceProfileByQuery(xquery);
final Document doc = new SAXReader().read(new StringReader(orchestratorProfile));
final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id");
final List<DedupConfig> configurations = new ArrayList<>();
for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) {
configurations.add(loadConfig(isLookUpService, actionSetId, o));
}
return configurations;
}
private static DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o)
throws ISLookUpException {
final Element s = (Element) o;
final String configProfileId = s.attributeValue("id");
final String conf =
isLookUpService.getResourceProfileByQuery(String.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
configProfileId));
final DedupConfig dedupConfig = DedupConfig.load(conf);
dedupConfig.getWf().setConfigurationId(actionSetId);
return dedupConfig;
}
}

View File

@ -1,8 +1,5 @@
package eu.dnetlib.dedup;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import eu.dnetlib.dedup.graph.ConnectedComponent;
import eu.dnetlib.dedup.graph.GraphProcessor;
@ -29,7 +26,7 @@ import java.util.List;
public class SparkCreateConnectedComponent {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json")));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
@ -50,7 +47,7 @@ public class SparkCreateConnectedComponent {
s -> new Tuple2<Object, String>(getHashcode(s), s)
);
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath,entity)).as(Encoders.bean(Relation.class));
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath, "",entity)).as(Encoders.bean(Relation.class));
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd();
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD();
final Dataset<Relation> mergeRelation = spark.createDataset(cc.filter(k->k.getDocIds().size()>1).flatMap((FlatMapFunction<ConnectedComponent, Relation>) c ->
@ -70,7 +67,7 @@ public class SparkCreateConnectedComponent {
tmp.add(r);
return tmp.stream();
}).iterator()).rdd(), Encoders.bean(Relation.class));
mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(targetPath,entity));
mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(targetPath,"",entity));
}
public static long getHashcode(final String id) {

View File

@ -0,0 +1,100 @@
package eu.dnetlib.dedup;
import com.google.common.hash.Hashing;
import eu.dnetlib.dedup.graph.ConnectedComponent;
import eu.dnetlib.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
public class SparkCreateConnectedComponent2 {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createCC_parameters.json")));
parser.parseArgument(args);
new SparkCreateConnectedComponent2().run(parser);
}
private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
final String graphBasePath = parser.get("graphBasePath");
final String workingPath = parser.get("workingPath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
try (SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
final String entity = dedupConf.getWf().getEntityType();
final String subEntity = dedupConf.getWf().getSubEntityValue();
final JavaPairRDD<Object, String> vertexes = sc.textFile(graphBasePath + "/" + subEntity)
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
.mapToPair((PairFunction<String, Object, String>)
s -> new Tuple2<Object, String>(getHashcode(s), s)
);
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class));
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd();
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD();
final Dataset<Relation> mergeRelation = spark.createDataset(cc.filter(k -> k.getDocIds().size() > 1).flatMap((FlatMapFunction<ConnectedComponent, Relation>) c ->
c.getDocIds()
.stream()
.flatMap(id -> {
List<Relation> tmp = new ArrayList<>();
Relation r = new Relation();
r.setSource(c.getCcId());
r.setTarget(id);
r.setRelClass("merges");
tmp.add(r);
r = new Relation();
r.setTarget(c.getCcId());
r.setSource(id);
r.setRelClass("isMergedIn");
tmp.add(r);
return tmp.stream();
}).iterator()).rdd(), Encoders.bean(Relation.class));
mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(workingPath, actionSetId, entity));
}
}
}
public static long getHashcode(final String id) {
return Hashing.murmur3_128().hashUnencodedChars(id).asLong();
}
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
SparkConf conf = new SparkConf();
return SparkSession
.builder()
.appName(SparkCreateSimRels2.class.getSimpleName())
.master(parser.get("master"))
.config(conf)
.enableHiveSupport()
.getOrCreate();
}
}

View File

@ -1,6 +1,5 @@
package eu.dnetlib.dedup;
import com.google.common.hash.Hashing;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.pace.config.DedupConfig;
@ -10,7 +9,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
@ -29,7 +27,7 @@ import java.util.List;
public class SparkCreateSimRels {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json")));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()

View File

@ -22,6 +22,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.dom4j.Document;
import org.dom4j.DocumentException;
@ -39,7 +40,7 @@ public class SparkCreateSimRels2 implements Serializable {
private static final Log log = LogFactory.getLog(SparkCreateSimRels2.class);
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json")));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json")));
parser.parseArgument(args);
new SparkCreateSimRels2().run(parser);
@ -48,12 +49,11 @@ public class SparkCreateSimRels2 implements Serializable {
private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
//read oozie parameters
final String rawGraphBasePath = parser.get("rawGraphBasePath");
final String graphBasePath = parser.get("graphBasePath");
final String rawSet = parser.get("rawSet");
final String agentId = parser.get("agentId");
final String agentName = parser.get("agentName");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
try (SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -62,11 +62,11 @@ public class SparkCreateSimRels2 implements Serializable {
JavaRDD<Tuple2<Text,Text>> simRel = sc.emptyRDD();
//for each dedup configuration
for (DedupConfig dedupConf: getConfigurations(isLookUpUrl, actionSetId)) {
for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
final String entity = dedupConf.getWf().getEntityType();
final String subEntity = dedupConf.getWf().getSubEntityValue();
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(rawGraphBasePath + "/" + subEntity)
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(graphBasePath + "/" + subEntity)
.mapToPair(s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
return new Tuple2<>(d.getIdentifier(), d);
@ -78,59 +78,54 @@ public class SparkCreateSimRels2 implements Serializable {
//create relations by comparing only elements in the same group
final JavaPairRDD<String, String> dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf);
JavaRDD<Relation> relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2()));
JavaRDD<Relation> relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity));
//save the simrel in the workingdir
spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity));
//create atomic actions
JavaRDD<Tuple2<Text, Text>> newSimRels = relationsRDD
.mapToPair(rel ->
new Tuple2<>(
createActionId(rel.getSource(), rel.getTarget(), entity), //TODO update the type, maybe take it from the configuration?
new AtomicAction(rawSet, new Agent(agentId, agentName, Agent.AGENT_TYPE.service), rel.getSource(), "isSimilarTo", rel.getTarget(), new ObjectMapper().writeValueAsString(rel).getBytes())))
.map(aa -> new Tuple2<>(aa._1(), transformAction(aa._2())));
.map(this::createSequenceFileRow);
simRel = simRel.union(newSimRels);
}
simRel.mapToPair(r -> r)
.saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
}
}
public Text createActionId(String source, String target, String entity) {
String type = "";
switch(entity){
case "result":
type = "resultResult_dedupSimilarity_isSimilarTo";
break;
case "organization":
type = "organizationOrganization_dedupSimilarity_isSimilarTo";
break;
default:
break;
}
String id = source + "@" + type + "@" + target;
return new Text(id);
}
public Text transformAction(AtomicAction aa) throws JsonProcessingException {
public Tuple2<Text, Text> createSequenceFileRow(Relation relation) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
return new Text(mapper.writeValueAsString(aa));
String id = relation.getSource() + "@" + relation.getRelClass() + "@" + relation.getTarget();
//TODO do be replaced by the new implementation of AtomicAction
AtomicAction aa = new AtomicAction("rawSet", new Agent("agentId", "agentName", Agent.AGENT_TYPE.service), relation.getSource(), relation.getRelClass(), relation.getTarget(), new ObjectMapper().writeValueAsString(relation).getBytes());
return new Tuple2<>(
new Text(id),
new Text(mapper.writeValueAsString(aa))
);
}
public Relation createSimRel(String source, String target){
public Relation createSimRel(String source, String target, String entity){
final Relation r = new Relation();
r.setSource(source);
r.setTarget(target);
r.setRelClass("isSimilarTo");
switch(entity){
case "result":
r.setRelClass("resultResult_dedupSimilarity_isSimilarTo");
break;
case "organization":
r.setRelClass("organizationOrganization_dedupSimilarity_isSimilarTo");
break;
default:
r.setRelClass("isSimilarTo");
break;
}
return r;
}
@ -146,39 +141,4 @@ public class SparkCreateSimRels2 implements Serializable {
.getOrCreate();
}
public List<DedupConfig> getConfigurations(String isLookUpUrl, String orchestrator) throws ISLookUpException, DocumentException {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", orchestrator);
log.info("loading dedup orchestration: " + xquery);
String orchestratorProfile = isLookUpService.getResourceProfileByQuery(xquery);
final Document doc = new SAXReader().read(new StringReader(orchestratorProfile));
final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id");
final List<DedupConfig> configurations = new ArrayList<>();
for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) {
configurations.add(loadConfig(isLookUpService, actionSetId, o));
}
return configurations;
}
private DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o)
throws ISLookUpException {
final Element s = (Element) o;
final String configProfileId = s.attributeValue("id");
final String conf =
isLookUpService.getResourceProfileByQuery(String.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
configProfileId));
log.debug("loaded dedup configuration from IS profile: " + conf);
final DedupConfig dedupConfig = DedupConfig.load(conf);
dedupConfig.getWf().setConfigurationId(actionSetId);
return dedupConfig;
}
}

View File

@ -0,0 +1,38 @@
[
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "action set identifier (name of the orchestrator)",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of the raw graph",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "rawSet",
"paramDescription": "the raw set to be saved (full path)",
"paramRequired": true
},
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "the url for the lookup service",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "path for the working directory",
"paramRequired": true
}
]

View File

@ -30,15 +30,9 @@
"paramRequired": true
},
{
"paramName": "ai",
"paramLongName": "agentId",
"paramDescription": "the agent identifier",
"paramRequired": true
},
{
"paramName": "an",
"paramLongName": "agentName",
"paramDescription": "the agent name",
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "path of the working directory",
"paramRequired": true
}
]

View File

@ -1,25 +1,13 @@
<workflow-app name="Create Similarity Relations" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>rawGraphBasePath</name>
<name>graphBasePath</name>
<description>the raw graph base path</description>
</property>
<property>
<name>actionSetBasePath</name>
<description>the output base path</description>
</property>
<property>
<name>rawSet</name>
<description>the output directory in the targetPath</description>
</property>
<property>
<name>agentId</name>
<description>the agent identifier</description>
</property>
<property>
<name>agentName</name>
<description>the agent name</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
@ -28,6 +16,10 @@
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -42,15 +34,15 @@
</property>
</parameters>
<start to="DeleteTargetPath"/>
<start to="DeleteWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DeleteTargetPath">
<action name="DeleteWorkingPath">
<fs>
<delete path='${targetPath}/${rawSet}'/>
<delete path='${workingPath}/${actionSetId}/*_simrel'/>
</fs>
<ok to="DuplicateScan"/>
<error to="Kill"/>
@ -74,8 +66,6 @@
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--i</arg><arg>${rawGraphBasePath}</arg>
<arg>--o</arg><arg>${rawSet}</arg>
<arg>--ai</arg><arg>${agentId}</arg>
<arg>--an</arg><arg>${agentName}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg>
</spark>