fixed error in the treeprocessor. it used th=-1 as default value, now it use th=1
This commit is contained in:
parent
9e8ea8f6ee
commit
5021e5048f
|
@ -0,0 +1,4 @@
|
|||
entitiesPath = /tmp/graph_openorgs_and_corda/organization
|
||||
workingPath = /tmp/openorgs_test/workingpath
|
||||
dedupConfPath = /tmp/openorgs_test/organization.strict.conf.json
|
||||
numPartitions = 40
|
|
@ -1,5 +1,6 @@
|
|||
package eu.dnetlib;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.hash.Hashing;
|
||||
import eu.dnetlib.graph.GraphProcessor;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
@ -11,6 +12,7 @@ import eu.dnetlib.reporter.SparkReporter;
|
|||
import eu.dnetlib.support.Block;
|
||||
import eu.dnetlib.support.ConnectedComponent;
|
||||
import eu.dnetlib.support.Relation;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
|
@ -27,7 +29,9 @@ import org.apache.spark.sql.SparkSession;
|
|||
import org.apache.spark.util.LongAccumulator;
|
||||
import scala.Serializable;
|
||||
import scala.Tuple2;
|
||||
import scala.math.Ordering;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
@ -36,6 +40,8 @@ public class Deduper implements Serializable {
|
|||
|
||||
private static final Log log = LogFactory.getLog(Deduper.class);
|
||||
|
||||
private static ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
public static JavaPairRDD<String, Block> createSortedBlocks(
|
||||
JavaPairRDD<String, MapDocument> mapDocs, DedupConfig config) {
|
||||
final String of = config.getWf().getOrderField();
|
||||
|
@ -71,7 +77,7 @@ public class Deduper implements Serializable {
|
|||
}
|
||||
|
||||
public static long hash(final String id) {
|
||||
return Hashing.murmur3_128().hashString(id).asLong();
|
||||
return Hashing.murmur3_128().hashString(id, Charset.defaultCharset()).asLong();
|
||||
}
|
||||
|
||||
public static ConnectedComponent entityMerger(String key, Iterator<String> values) {
|
||||
|
@ -79,7 +85,7 @@ public class Deduper implements Serializable {
|
|||
ConnectedComponent cc = new ConnectedComponent();
|
||||
cc.setCcId(key);
|
||||
cc.setDocs(StreamSupport.stream(Spliterators.spliteratorUnknownSize(values, Spliterator.ORDERED), false)
|
||||
.collect(Collectors.toSet()));
|
||||
.collect(Collectors.toCollection(HashSet::new)));
|
||||
return cc;
|
||||
}
|
||||
|
||||
|
@ -115,6 +121,13 @@ public class Deduper implements Serializable {
|
|||
// create blocks for deduplication
|
||||
JavaPairRDD<String, Block> blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf);
|
||||
|
||||
|
||||
//TODO test purpose
|
||||
blocks.foreach(b -> System.out.println("b = " + b));
|
||||
blocks = blocks.filter(b -> b._1().equals("ghahos"));
|
||||
|
||||
|
||||
|
||||
// create relations by comparing only elements in the same group
|
||||
JavaRDD<Relation> relations = Deduper.computeRelations(sc, blocks, dedupConf);
|
||||
|
||||
|
@ -145,15 +158,18 @@ public class Deduper implements Serializable {
|
|||
.map(Relation::toEdgeRdd)
|
||||
.rdd();
|
||||
|
||||
final Dataset<Relation> mergeRels = spark
|
||||
.createDataset(
|
||||
GraphProcessor
|
||||
JavaRDD<ConnectedComponent> ccs = GraphProcessor
|
||||
.findCCs(vertexes.rdd(), edgeRdd, maxIterations)
|
||||
.toJavaRDD()
|
||||
.toJavaRDD();
|
||||
|
||||
JavaRDD<Relation> mergeRel = ccs
|
||||
.filter(k -> k.getDocs().size() > 1)
|
||||
.flatMap(cc -> ccToMergeRel(cc, dedupConf))
|
||||
.map(it -> new Relation(it._1(), it._2(), "mergeRel"))
|
||||
.rdd(),
|
||||
.map(it -> new Relation(it._1(), it._2(), "mergeRel"));
|
||||
|
||||
final Dataset<Relation> mergeRels = spark
|
||||
.createDataset(
|
||||
mergeRel.rdd(),
|
||||
Encoders.bean(Relation.class));
|
||||
|
||||
mergeRels.write().mode(SaveMode.Overwrite).parquet(mergeRelsPath);
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
package eu.dnetlib.jobs;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.utils.Utility;
|
||||
import eu.dnetlib.support.ArgumentApplicationParser;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
|
||||
abstract class AbstractSparkJob implements Serializable {
|
||||
|
||||
protected static final int NUM_PARTITIONS = 1000;
|
||||
|
||||
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
public ArgumentApplicationParser parser; // parameters for the spark action
|
||||
public SparkSession spark; // the spark session
|
||||
|
||||
public AbstractSparkJob() {}
|
||||
|
||||
public AbstractSparkJob(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
|
||||
this.parser = parser;
|
||||
this.spark = spark;
|
||||
}
|
||||
|
||||
abstract void run();
|
||||
|
||||
protected static SparkSession getSparkSession(SparkConf conf) {
|
||||
return SparkSession.builder().config(conf).getOrCreate();
|
||||
}
|
||||
|
||||
protected static <T> void save(Dataset<T> dataset, String outPath, SaveMode mode) {
|
||||
dataset.write().option("compression", "gzip").mode(mode).json(outPath);
|
||||
}
|
||||
|
||||
protected static DedupConfig loadDedupConfig(String dedupConfPath) {
|
||||
return DedupConfig.load(Utility.readFromClasspath("/eu/dnetlib/pace/config/organization.strict.conf.json", AbstractSparkJob.class));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
package eu.dnetlib.jobs;
|
||||
|
||||
import eu.dnetlib.Deduper;
|
||||
import eu.dnetlib.pace.utils.Utility;
|
||||
import eu.dnetlib.support.ArgumentApplicationParser;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public class SparkCreateDedupEntity extends AbstractSparkJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(eu.dnetlib.jobs.SparkCreateDedupEntity.class);
|
||||
|
||||
public SparkCreateDedupEntity(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
Utility.readFromClasspath("/eu/dnetlib/pace/createDedupEntity_parameters.json", SparkCreateDedupEntity.class)
|
||||
);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
new SparkCreateDedupEntity(
|
||||
parser,
|
||||
getSparkSession(conf)
|
||||
).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
// read oozie parameters
|
||||
final String entitiesPath = parser.get("entitiesPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String dedupConfPath = parser.get("dedupConfPath");
|
||||
final int numPartitions = Optional
|
||||
.ofNullable(parser.get("numPartitions"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_PARTITIONS);
|
||||
|
||||
log.info("entitiesPath: '{}'", entitiesPath);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("dedupConfPath: '{}'", dedupConfPath);
|
||||
log.info("numPartitions: '{}'", numPartitions);
|
||||
|
||||
Deduper.createDedupEntity(
|
||||
loadDedupConfig(dedupConfPath),
|
||||
workingPath + "/mergerels",
|
||||
entitiesPath,
|
||||
spark,
|
||||
workingPath + "/dedupentity"
|
||||
);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
package eu.dnetlib.jobs;
|
||||
|
||||
import eu.dnetlib.Deduper;
|
||||
import eu.dnetlib.pace.utils.Utility;
|
||||
import eu.dnetlib.support.ArgumentApplicationParser;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public class SparkCreateMergeRels extends AbstractSparkJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class);
|
||||
|
||||
public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
Utility.readFromClasspath("/eu/dnetlib/pace/createMergeRels_parameters.json", SparkCreateSimRels.class)
|
||||
);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
new SparkCreateSimRels(
|
||||
parser,
|
||||
getSparkSession(conf)
|
||||
).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
// read oozie parameters
|
||||
final String entitiesPath = parser.get("entitiesPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String dedupConfPath = parser.get("dedupConfPath");
|
||||
final int numPartitions = Optional
|
||||
.ofNullable(parser.get("numPartitions"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_PARTITIONS);
|
||||
|
||||
log.info("entitiesPath: '{}'", entitiesPath);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("dedupConfPath: '{}'", dedupConfPath);
|
||||
log.info("numPartitions: '{}'", numPartitions);
|
||||
|
||||
Deduper.createMergeRels(
|
||||
loadDedupConfig(dedupConfPath),
|
||||
entitiesPath,
|
||||
workingPath + "/mergerels",
|
||||
workingPath + "/simrels",
|
||||
spark
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
package eu.dnetlib.jobs;
|
||||
|
||||
import eu.dnetlib.Deduper;
|
||||
import eu.dnetlib.pace.utils.Utility;
|
||||
import eu.dnetlib.support.ArgumentApplicationParser;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public class SparkCreateSimRels extends AbstractSparkJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class);
|
||||
|
||||
public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
Utility.readFromClasspath("/eu/dnetlib/pace/parameters/createSimRels_parameters.json", SparkCreateSimRels.class)
|
||||
);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
new SparkCreateSimRels(
|
||||
parser,
|
||||
getSparkSession(conf)
|
||||
).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
// read oozie parameters
|
||||
final String entitiesPath = parser.get("entitiesPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final String dedupConfPath = parser.get("dedupConfPath");
|
||||
final int numPartitions = Optional
|
||||
.ofNullable(parser.get("numPartitions"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_PARTITIONS);
|
||||
|
||||
log.info("entitiesPath: '{}'", entitiesPath);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("dedupConfPath: '{}'", dedupConfPath);
|
||||
log.info("numPartitions: '{}'", numPartitions);
|
||||
|
||||
Deduper.createSimRels(
|
||||
loadDedupConfig(dedupConfPath),
|
||||
spark,
|
||||
entitiesPath,
|
||||
workingPath + "/simrels"
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
package eu.dnetlib.support;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.commons.cli.*;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.Serializable;
|
||||
import java.io.StringWriter;
|
||||
import java.util.*;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
public class ArgumentApplicationParser implements Serializable {
|
||||
|
||||
private final Options options = new Options();
|
||||
private final Map<String, String> objectMap = new HashMap<>();
|
||||
|
||||
private final List<String> compressedValues = new ArrayList<>();
|
||||
|
||||
public ArgumentApplicationParser(final String json_configuration) throws Exception {
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
final OptionsParameter[] configuration = mapper.readValue(json_configuration, OptionsParameter[].class);
|
||||
createOptionMap(configuration);
|
||||
}
|
||||
|
||||
public ArgumentApplicationParser(final OptionsParameter[] configuration) {
|
||||
createOptionMap(configuration);
|
||||
}
|
||||
|
||||
private void createOptionMap(final OptionsParameter[] configuration) {
|
||||
|
||||
Arrays
|
||||
.stream(configuration)
|
||||
.map(
|
||||
conf -> {
|
||||
final Option o = new Option(conf.getParamName(), true, conf.getParamDescription());
|
||||
o.setLongOpt(conf.getParamLongName());
|
||||
o.setRequired(conf.isParamRequired());
|
||||
if (conf.isCompressed()) {
|
||||
compressedValues.add(conf.getParamLongName());
|
||||
}
|
||||
return o;
|
||||
})
|
||||
.forEach(options::addOption);
|
||||
|
||||
// HelpFormatter formatter = new HelpFormatter();
|
||||
// formatter.printHelp("myapp", null, options, null, true);
|
||||
|
||||
}
|
||||
|
||||
public static String decompressValue(final String abstractCompressed) {
|
||||
try {
|
||||
byte[] byteArray = org.apache.commons.codec.binary.Base64.decodeBase64(abstractCompressed.getBytes());
|
||||
GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(byteArray));
|
||||
final StringWriter stringWriter = new StringWriter();
|
||||
IOUtils.copy(gis, stringWriter);
|
||||
return stringWriter.toString();
|
||||
} catch (Throwable e) {
|
||||
System.out.println("Wrong value to decompress:" + abstractCompressed);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static String compressArgument(final String value) throws Exception {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
GZIPOutputStream gzip = new GZIPOutputStream(out);
|
||||
gzip.write(value.getBytes());
|
||||
gzip.close();
|
||||
return java.util.Base64.getEncoder().encodeToString(out.toByteArray());
|
||||
}
|
||||
|
||||
public void parseArgument(final String[] args) throws Exception {
|
||||
CommandLineParser parser = new BasicParser();
|
||||
CommandLine cmd = parser.parse(options, args);
|
||||
Arrays
|
||||
.stream(cmd.getOptions())
|
||||
.forEach(
|
||||
it -> objectMap
|
||||
.put(
|
||||
it.getLongOpt(),
|
||||
compressedValues.contains(it.getLongOpt())
|
||||
? decompressValue(it.getValue())
|
||||
: it.getValue()));
|
||||
}
|
||||
|
||||
public String get(final String key) {
|
||||
return objectMap.get(key);
|
||||
}
|
||||
|
||||
public Map<String, String> getObjectMap() {
|
||||
return objectMap;
|
||||
}
|
||||
}
|
|
@ -89,6 +89,13 @@ public class Block implements Serializable {
|
|||
return documents.size();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Block{" +
|
||||
"key='" + key + '\'' +
|
||||
", size=" + documents.size() + '\'' +
|
||||
", names=" + documents.stream().map(d -> d.getFieldMap().get("country").stringValue()).collect(Collectors.toList()) + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package eu.dnetlib.support;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import eu.dnetlib.pace.utils.Utility;
|
||||
|
@ -14,14 +15,14 @@ import eu.dnetlib.pace.util.PaceException;
|
|||
|
||||
public class ConnectedComponent implements Serializable {
|
||||
|
||||
private Set<String> docs;
|
||||
private HashSet<String> docs;
|
||||
private String ccId;
|
||||
|
||||
public ConnectedComponent() {
|
||||
}
|
||||
|
||||
public ConnectedComponent(Set<String> docs) {
|
||||
this.docs = docs;
|
||||
this.docs = new HashSet<>(docs);
|
||||
createID();
|
||||
}
|
||||
|
||||
|
@ -68,7 +69,7 @@ public class ConnectedComponent implements Serializable {
|
|||
return docs;
|
||||
}
|
||||
|
||||
public void setDocs(Set<String> docs) {
|
||||
public void setDocs(HashSet<String> docs) {
|
||||
this.docs = docs;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
package eu.dnetlib.support;
|
||||
|
||||
public class OptionsParameter {
|
||||
|
||||
private String paramName;
|
||||
private String paramLongName;
|
||||
private String paramDescription;
|
||||
private boolean paramRequired;
|
||||
private boolean compressed;
|
||||
|
||||
public OptionsParameter() {
|
||||
}
|
||||
|
||||
public String getParamName() {
|
||||
return paramName;
|
||||
}
|
||||
|
||||
public String getParamLongName() {
|
||||
return paramLongName;
|
||||
}
|
||||
|
||||
public String getParamDescription() {
|
||||
return paramDescription;
|
||||
}
|
||||
|
||||
public boolean isParamRequired() {
|
||||
return paramRequired;
|
||||
}
|
||||
|
||||
public boolean isCompressed() {
|
||||
return compressed;
|
||||
}
|
||||
|
||||
public void setCompressed(boolean compressed) {
|
||||
this.compressed = compressed;
|
||||
}
|
||||
}
|
|
@ -47,4 +47,13 @@ public class Relation implements Serializable {
|
|||
public Edge<String> toEdgeRdd(){
|
||||
return new Edge<>(Deduper.hash(source), Deduper.hash(target), type);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Relation{" +
|
||||
"source='" + source + '\'' +
|
||||
", target='" + target + '\'' +
|
||||
", type='" + type + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,167 @@
|
|||
<workflow-app name="Deduplication WF" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>entitiesPath</name>
|
||||
<description>the input entity path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>workingPath</name>
|
||||
<description>path for the working directory</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>numPartitions</name>
|
||||
<description>number of partitions for the spark files</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dedupConfPath</name>
|
||||
<description>path for the dedup configuration file</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="resetWorkingPath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="resetWorkingPath">
|
||||
<fs>
|
||||
<delete path="${workingPath}"/>
|
||||
</fs>
|
||||
<ok to="CreateSimRels"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CreateSimRels">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Similarity Relations</name>
|
||||
<class>eu.dnetlib.jobs.SparkCreateSimRels</class>
|
||||
<jar>dnet-dedup-test-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--entitiesPath</arg><arg>${entitiesPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--numPartitions</arg><arg>${numPartitions}</arg>
|
||||
<arg>--dedupConfPath</arg><arg>${dedupConfPath}</arg>
|
||||
</spark>
|
||||
<ok to="CreateMergeRels"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CreateMergeRels">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Merge Relations</name>
|
||||
<class>eu.dnetlib.jobs.SparkCreateMergeRels</class>
|
||||
<jar>dhp-dedup-test-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--entitiesPath</arg><arg>${entitiesPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--numPartitions</arg><arg>${numPartitions}</arg>
|
||||
<arg>--dedupConfPath</arg><arg>${dedupConfPath}</arg>
|
||||
</spark>
|
||||
<ok to="CreateDedupEntities"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CreateDedupEntities">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Dedup Entities</name>
|
||||
<class>eu.dnetlib.jobs.SparkCreateDedupEntity</class>
|
||||
<jar>dhp-dedup-test-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--entitiesPath</arg><arg>${entitiesPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--numPartitions</arg><arg>${numPartitions}</arg>
|
||||
<arg>--dedupConfPath</arg><arg>${dedupConfPath}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -1,8 +1,12 @@
|
|||
package eu.dnetlib.pace;
|
||||
|
||||
import eu.dnetlib.Deduper;
|
||||
import eu.dnetlib.jobs.SparkCreateDedupEntity;
|
||||
import eu.dnetlib.jobs.SparkCreateMergeRels;
|
||||
import eu.dnetlib.jobs.SparkCreateSimRels;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.utils.Utility;
|
||||
import eu.dnetlib.support.ArgumentApplicationParser;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.Before;
|
||||
|
@ -16,12 +20,15 @@ public class DedupLocalTest extends DedupTestUtils {
|
|||
DedupConfig config;
|
||||
JavaSparkContext context;
|
||||
|
||||
final String entitiesPath = "/Users/miconis/Desktop/publications_to_fix.json";
|
||||
final String entitiesPath = "/Users/miconis/IdeaProjects/DnetDedup/dnet-dedup/dnet-dedup-test/src/test/resources/eu/dnetlib/pace/examples/openorgs.to.fix.json";
|
||||
final String workingPath = "/tmp/working_dir";
|
||||
final String numPartitions = "10";
|
||||
final String dedupConfPath = "/eu/dnetlib/pace/config/organization.strict.conf.json";
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
|
||||
config = DedupConfig.load(Utility.readFromClasspath("/eu/dnetlib/pace/config/publication.current.conf.json", DedupLocalTest.class));
|
||||
config = DedupConfig.load(Utility.readFromClasspath("/eu/dnetlib/pace/config/organization.strict.conf.json", DedupLocalTest.class));
|
||||
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
|
@ -32,6 +39,63 @@ public class DedupLocalTest extends DedupTestUtils {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createSimRelTest() throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(Utility.readFromClasspath("/eu/dnetlib/pace/parameters/createSimRels_parameters.json", SparkCreateSimRels.class));
|
||||
|
||||
parser.parseArgument(
|
||||
new String[] {
|
||||
"-e", entitiesPath,
|
||||
"-w", workingPath,
|
||||
"-np", numPartitions,
|
||||
"-dc", dedupConfPath
|
||||
});
|
||||
|
||||
new SparkCreateSimRels(
|
||||
parser,
|
||||
spark
|
||||
).run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createMergeRelTest() throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(Utility.readFromClasspath("/eu/dnetlib/pace/parameters/createMergeRels_parameters.json", SparkCreateMergeRels.class));
|
||||
|
||||
parser.parseArgument(
|
||||
new String[] {
|
||||
"-e", entitiesPath,
|
||||
"-w", workingPath,
|
||||
"-np", numPartitions,
|
||||
"-dc", dedupConfPath
|
||||
});
|
||||
|
||||
new SparkCreateMergeRels(
|
||||
parser,
|
||||
spark
|
||||
).run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createDedupEntityTest() throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(Utility.readFromClasspath("/eu/dnetlib/pace/parameters/createDedupEntity_parameters.json", SparkCreateDedupEntity.class));
|
||||
|
||||
parser.parseArgument(
|
||||
new String[] {
|
||||
"-e", entitiesPath,
|
||||
"-w", workingPath,
|
||||
"-np", numPartitions,
|
||||
"-dc", dedupConfPath
|
||||
});
|
||||
|
||||
new SparkCreateDedupEntity(
|
||||
parser,
|
||||
spark
|
||||
).run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deduplicationTest() {
|
||||
|
||||
|
|
|
@ -71,6 +71,13 @@
|
|||
"weight": 1,
|
||||
"countIfUndefined": "false",
|
||||
"params": {}
|
||||
},
|
||||
{
|
||||
"field": "rorid",
|
||||
"comparator": "exactMatch",
|
||||
"weight": 1,
|
||||
"countIfUndefined": "false",
|
||||
"params": {}
|
||||
}
|
||||
],
|
||||
"threshold": 1,
|
||||
|
@ -115,8 +122,8 @@
|
|||
"aggregation": "AND",
|
||||
"positive": "layer3",
|
||||
"negative": "NO_MATCH",
|
||||
"undefined": "layer3",
|
||||
"ignoreUndefined": "true"
|
||||
"undefined": "NO_MATCH",
|
||||
"ignoreUndefined": "false"
|
||||
},
|
||||
"layer3": {
|
||||
"fields": [
|
||||
|
@ -184,11 +191,12 @@
|
|||
}
|
||||
},
|
||||
"model" : [
|
||||
{ "name" : "country", "type" : "String", "path" : "$.organization.metadata.country.classid"},
|
||||
{ "name" : "legalshortname", "type" : "String", "path" : "$.organization.metadata.legalshortname.value"},
|
||||
{ "name" : "legalname", "type" : "String", "path" : "$.organization.metadata.legalname.value" },
|
||||
{ "name" : "websiteurl", "type" : "URL", "path" : "$.organization.metadata.websiteurl.value" },
|
||||
{ "name" : "gridid", "type" : "String", "path" : "$.pid[?(@.qualifier.classid =='grid.ac')].value"},
|
||||
{ "name" : "country", "type" : "String", "path" : "$.country.classid"},
|
||||
{ "name" : "legalshortname", "type" : "String", "path" : "$.legalshortname.value"},
|
||||
{ "name" : "legalname", "type" : "String", "path" : "$.legalname.value" },
|
||||
{ "name" : "websiteurl", "type" : "URL", "path" : "$.websiteurl.value" },
|
||||
{ "name" : "gridid", "type" : "String", "path" : "$.pid[?(@.qualifier.classid =='GRID')].value"},
|
||||
{ "name" : "rorid", "type" : "String", "path" : "$.pid[?(@.qualifier.classid =='ROR')].value"},
|
||||
{ "name" : "originalId", "type" : "String", "path" : "$.id" }
|
||||
],
|
||||
"blacklists" : {
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,26 @@
|
|||
[
|
||||
{
|
||||
"paramName": "e",
|
||||
"paramLongName": "entitiesPath",
|
||||
"paramDescription": "the input entities",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "path of the working directory",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "np",
|
||||
"paramLongName": "numPartitions",
|
||||
"paramDescription": "number of partitions for the similarity relations intermediate phases",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "dc",
|
||||
"paramLongName": "dedupConfPath",
|
||||
"paramDescription": "path of the dedup configuration",
|
||||
"paramRequired": false
|
||||
}
|
||||
]
|
|
@ -0,0 +1,26 @@
|
|||
[
|
||||
{
|
||||
"paramName": "e",
|
||||
"paramLongName": "entitiesPath",
|
||||
"paramDescription": "the input entities",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "path of the working directory",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "np",
|
||||
"paramLongName": "numPartitions",
|
||||
"paramDescription": "number of partitions for the similarity relations intermediate phases",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "dc",
|
||||
"paramLongName": "dedupConfPath",
|
||||
"paramDescription": "path of the dedup configuration",
|
||||
"paramRequired": false
|
||||
}
|
||||
]
|
|
@ -0,0 +1,26 @@
|
|||
[
|
||||
{
|
||||
"paramName": "e",
|
||||
"paramLongName": "entitiesPath",
|
||||
"paramDescription": "the input entities",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "path of the working directory",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "np",
|
||||
"paramLongName": "numPartitions",
|
||||
"paramDescription": "number of partitions for the similarity relations intermediate phases",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "dc",
|
||||
"paramLongName": "dedupConfPath",
|
||||
"paramDescription": "path of the dedup configuration",
|
||||
"paramRequired": false
|
||||
}
|
||||
]
|
|
@ -51,7 +51,7 @@ public class TreeNodeDef implements Serializable {
|
|||
fieldConf.getComparator() + " on " + fieldConf.getField() + " " + fields.indexOf(fieldConf),
|
||||
new FieldStats(
|
||||
weight,
|
||||
Double.parseDouble(fieldConf.getParams().getOrDefault("threshold", "-1.0")),
|
||||
Double.parseDouble(fieldConf.getParams().getOrDefault("threshold", "1.0")),
|
||||
result,
|
||||
fieldConf.isCountIfUndefined(),
|
||||
doc1.getFieldMap().get(fieldConf.getField()),
|
||||
|
|
Loading…
Reference in New Issue