implementation of the lookup procedure to take dedup conf from the resource profiles

This commit is contained in:
miconis 2020-03-18 17:41:56 +01:00
parent f32eae5ce9
commit 679b5869e5
5 changed files with 113 additions and 78 deletions

View File

@ -2,8 +2,13 @@ package eu.dnetlib.dedup;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.actionmanager.actions.AtomicAction;
import eu.dnetlib.actionmanager.common.Agent;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.MapDocumentUtil;
@ -17,47 +22,38 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import scala.Tuple2;
import eu.dnetlib.actionmanager.actions.AtomicAction;
import eu.dnetlib.actionmanager.common.Agent;
import java.io.Serializable;
import java.util.Arrays;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class SparkCreateSimRels2 implements Serializable {
final static String CONF_SEPARATOR = "@@@";
private static final Log log = LogFactory.getLog(SparkCreateSimRels2.class);
public static List<DedupConfig> decompressConfs(String compressedConfs){
return Arrays.stream(compressedConfs.split(CONF_SEPARATOR))
.map(ArgumentApplicationParser::decompressValue)
.map(DedupConfig::load)
.collect(Collectors.toList());
}
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json")));
parser.parseArgument(args);
new SparkCreateSimRels2().run(parser, decompressConfs(parser.get("dedupConf")));
new SparkCreateSimRels2().run(parser);
}
private void run(ArgumentApplicationParser parser, List<DedupConfig> dedupConfs) {
private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
//read oozie parameters
final String sourcePath = parser.get("sourcePath");
final String targetPath = parser.get("targetPath");
final String rawSetName = parser.get("rawSet");
final String rawGraphBasePath = parser.get("rawGraphBasePath");
final String rawSet = parser.get("rawSet");
final String agentId = parser.get("agentId");
final String agentName = parser.get("agentName");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
try (SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -66,10 +62,11 @@ public class SparkCreateSimRels2 implements Serializable {
JavaRDD<Tuple2<Text,Text>> simRel = sc.emptyRDD();
//for each dedup configuration
for (DedupConfig dedupConf: dedupConfs) {
for (DedupConfig dedupConf: getConfigurations(isLookUpUrl, actionSetId)) {
final String entity = dedupConf.getWf().getEntityType();
final String subEntity = dedupConf.getWf().getSubEntityValue();
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(sourcePath + "/" + entity)
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(rawGraphBasePath + "/" + subEntity)
.mapToPair(s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
return new Tuple2<>(d.getIdentifier(), d);
@ -88,25 +85,35 @@ public class SparkCreateSimRels2 implements Serializable {
.mapToPair(rel ->
new Tuple2<>(
createActionId(rel.getSource(), rel.getTarget(), entity), //TODO update the type, maybe take it from the configuration?
new AtomicAction(rawSetName, new Agent(agentId, agentName, Agent.AGENT_TYPE.service), rel.getSource(), "isSimilarTo", rel.getTarget(), new ObjectMapper().writeValueAsString(rel).getBytes())))
new AtomicAction(rawSet, new Agent(agentId, agentName, Agent.AGENT_TYPE.service), rel.getSource(), "isSimilarTo", rel.getTarget(), new ObjectMapper().writeValueAsString(rel).getBytes())))
.map(aa -> new Tuple2<>(aa._1(), transformAction(aa._2())));
simRel = simRel.union(newSimRels);
}
String targetDirectory = targetPath + "/" + rawSetName;
// simRel.map(s -> s._1().toString()).saveAsTextFile(targetDirectory);
simRel.mapToPair(r -> r)
.saveAsHadoopFile(targetDirectory, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
.saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
}
}
public Text createActionId(String source, String target, String type) {
public Text createActionId(String source, String target, String entity) {
String type = "";
switch(entity){
case "result":
type = "resultResult_dedupSimilarity_isSimilarTo";
break;
case "organization":
type = "organizationOrganization_dedupSimilarity_isSimilarTo";
break;
default:
break;
}
String id = source + "@" + type + "@" + target;
return new Text(id);
@ -135,8 +142,43 @@ public class SparkCreateSimRels2 implements Serializable {
.appName(SparkCreateSimRels2.class.getSimpleName())
.master(parser.get("master"))
.config(conf)
// .enableHiveSupport()
.enableHiveSupport()
.getOrCreate();
}
public List<DedupConfig> getConfigurations(String isLookUpUrl, String orchestrator) throws ISLookUpException, DocumentException {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", orchestrator);
log.info("loading dedup orchestration: " + xquery);
String orchestratorProfile = isLookUpService.getResourceProfileByQuery(xquery);
final Document doc = new SAXReader().read(new StringReader(orchestratorProfile));
final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id");
final List<DedupConfig> configurations = new ArrayList<>();
for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) {
configurations.add(loadConfig(isLookUpService, actionSetId, o));
}
return configurations;
}
private DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o)
throws ISLookUpException {
final Element s = (Element) o;
final String configProfileId = s.attributeValue("id");
final String conf =
isLookUpService.getResourceProfileByQuery(String.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
configProfileId));
log.debug("loaded dedup configuration from IS profile: " + conf);
final DedupConfig dedupConfig = DedupConfig.load(conf);
dedupConfig.getWf().setConfigurationId(actionSetId);
return dedupConfig;
}
}

View File

@ -6,33 +6,27 @@
"paramRequired": true
},
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "address for the LookUp",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "action set identifier (name of the orchestrator)",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "rawGraphBasePath",
"paramDescription": "the base path of the raw graph",
"paramRequired": true
},
{
"paramName": "e",
"paramLongName": "entity",
"paramDescription": "the type of entity to be deduped (directory in the sourcePath)",
"paramRequired": true
},
{
"paramName": "c",
"paramLongName": "dedupConf",
"paramDescription": "list of dedup configuration to be used",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "targetPath",
"paramDescription": "target base path to save dedup result (actions)",
"paramRequired": true
},
{
"paramName": "rs",
"paramName": "o",
"paramLongName": "rawSet",
"paramDescription": "the raw set to be saved (directory in the targetPath)",
"paramDescription": "the raw set to be saved (full path)",
"paramRequired": true
},
{

View File

@ -1,19 +1,11 @@
<workflow-app name="Create Similarity Relations" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<name>rawGraphBasePath</name>
<description>the raw graph base path</description>
</property>
<property>
<name>entity</name>
<description>the entity that should be processed</description>
</property>
<property>
<name>dedupConf</name>
<description>the (list of) dedup Configuration(s)</description>
</property>
<property>
<name>targetPath</name>
<name>actionSetBasePath</name>
<description>the output base path</description>
</property>
<property>
@ -28,6 +20,14 @@
<name>agentName</name>
<description>the agent name</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -72,13 +72,12 @@
spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--entity</arg><arg>${entity}</arg>
<arg>--dedupConf</arg><arg>${dedupConf}</arg>
<arg>--rawSet</arg><arg>${rawSet}</arg>
<arg>--agentId</arg><arg>${agentId}</arg>
<arg>--agentName</arg><arg>${agentName}</arg>
<arg>--i</arg><arg>${rawGraphBasePath}</arg>
<arg>--o</arg><arg>${rawSet}</arg>
<arg>--ai</arg><arg>${agentId}</arg>
<arg>--an</arg><arg>${agentName}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -20,13 +20,11 @@ import java.util.Set;
public class SparkCreateDedupTest {
String configuration;
String configuration2;
String entity = "publication";
String entity = "organization";
@Before
public void setUp() throws IOException {
configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org1.curr.conf.json"));
configuration2 = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org2.curr.conf.json"));
configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json"));
}
@Test
@ -48,11 +46,12 @@ public class SparkCreateDedupTest {
"-mt", "local[*]",
"-s", "/Users/miconis/dumps",
"-e", entity,
"-c", ArgumentApplicationParser.compressArgument(configuration) + "@@@" + ArgumentApplicationParser.compressArgument(configuration2),
"-t", "/tmp/dedup",
"-rs", "rawset_test",
"-c", ArgumentApplicationParser.compressArgument(configuration),
"-rs", "/tmp/dedup/rawset_test",
"-ai", "agentId",
"-an", "agentName"
"-an", "agentName",
"-asi", "dedup-similarity-result-levenstein",
"-la", "lookupurl",
});
}

View File

@ -3,6 +3,7 @@
"threshold" : "0.99",
"dedupRun" : "001",
"entityType" : "organization",
"subEntityValue": "organization",
"orderField" : "legalname",
"queueMaxSize" : "2000",
"groupMaxSize" : "50",