forked from D-Net/dnet-hadoop
small changes
This commit is contained in:
parent
890b49fb5d
commit
df19548c56
|
@ -67,7 +67,7 @@ public abstract class AbstractPaceFunctions {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String cleanup(final String s) {
|
protected String cleanup(final String s) {
|
||||||
final String s1 = HTML_REGEX.matcher(s).replaceAll( "");
|
final String s1 = HTML_REGEX.matcher(s).replaceAll("");
|
||||||
final String s2 = unicodeNormalization(s1.toLowerCase());
|
final String s2 = unicodeNormalization(s1.toLowerCase());
|
||||||
final String s3 = nfd(s2);
|
final String s3 = nfd(s2);
|
||||||
final String s4 = fixXML(s3);
|
final String s4 = fixXML(s3);
|
||||||
|
|
|
@ -91,7 +91,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
||||||
} else {
|
} else {
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
})
|
}).checkpoint()
|
||||||
|
|
||||||
var relBlocks: Dataset[Row] = null
|
var relBlocks: Dataset[Row] = null
|
||||||
|
|
||||||
|
@ -178,8 +178,8 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
||||||
|
|
||||||
val res = relBlocks.filter(col("match").equalTo(true))
|
val res = relBlocks.filter(col("match").equalTo(true))
|
||||||
.select(col("l.identifier").as("from"), col("r.identifier").as("to"))
|
.select(col("l.identifier").as("from"), col("r.identifier").as("to"))
|
||||||
.repartition()
|
//.repartition()
|
||||||
.dropDuplicates()
|
.distinct()
|
||||||
|
|
||||||
// res.show(false)
|
// res.show(false)
|
||||||
res.select(functions.struct("from", "to"))
|
res.select(functions.struct("from", "to"))
|
||||||
|
|
|
@ -6,19 +6,19 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.jayway.jsonpath.Configuration;
|
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
|
||||||
import com.jayway.jsonpath.Option;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
import com.jayway.jsonpath.Configuration;
|
||||||
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
|
import com.jayway.jsonpath.JsonPath;
|
||||||
|
import com.jayway.jsonpath.Option;
|
||||||
|
|
||||||
import eu.dnetlib.pace.config.Config;
|
import eu.dnetlib.pace.config.Config;
|
||||||
import eu.dnetlib.pace.tree.support.AbstractListComparator;
|
import eu.dnetlib.pace.tree.support.AbstractListComparator;
|
||||||
import eu.dnetlib.pace.tree.support.ComparatorClass;
|
import eu.dnetlib.pace.tree.support.ComparatorClass;
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
import com.jayway.jsonpath.JsonPath;
|
|
||||||
|
|
||||||
@ComparatorClass("jsonListMatch")
|
@ComparatorClass("jsonListMatch")
|
||||||
public class JsonListMatch extends AbstractListComparator {
|
public class JsonListMatch extends AbstractListComparator {
|
||||||
|
@ -63,7 +63,9 @@ public class JsonListMatch extends AbstractListComparator {
|
||||||
|
|
||||||
StringBuilder st = new StringBuilder(); // to build the string used for comparisons basing on the jpath into
|
StringBuilder st = new StringBuilder(); // to build the string used for comparisons basing on the jpath into
|
||||||
// parameters
|
// parameters
|
||||||
final DocumentContext documentContext = JsonPath.using(Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS)).parse(json);
|
final DocumentContext documentContext = JsonPath
|
||||||
|
.using(Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS))
|
||||||
|
.parse(json);
|
||||||
// for each path in the param list
|
// for each path in the param list
|
||||||
for (String key : params.keySet().stream().filter(k -> k.contains("jpath")).collect(Collectors.toList())) {
|
for (String key : params.keySet().stream().filter(k -> k.contains("jpath")).collect(Collectors.toList())) {
|
||||||
String path = params.get(key);
|
String path = params.get(key);
|
||||||
|
|
|
@ -89,7 +89,7 @@ public class BlockProcessor {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (i > wf.getSlidingWindowSize()) {
|
if (++i > wf.getSlidingWindowSize()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -91,10 +91,10 @@ public class SparkCreateSimRels extends AbstractSparkAction {
|
||||||
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
||||||
.transform(sparkConfig.modelExtractor()) // Extract fields from input json column according to model
|
.transform(sparkConfig.modelExtractor()) // Extract fields from input json column according to model
|
||||||
// definition
|
// definition
|
||||||
.transform(sparkConfig.generateClustersWithDFAPI()) // generate <key,block> pairs according to
|
.transform(sparkConfig.generateAndProcessClustersWithJoins()) // generate <key,block> pairs according to
|
||||||
// filters, clusters, and model
|
// filters, clusters, and model
|
||||||
// definition
|
// definition
|
||||||
.transform(sparkConfig.processClusters()) // process blocks and emits <from,to> pairs of found
|
// .transform(sparkConfig.processClusters()) // process blocks and emits <from,to> pairs of found
|
||||||
// similarities
|
// similarities
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<Row, Relation>) t -> DedupUtility
|
(MapFunction<Row, Relation>) t -> DedupUtility
|
||||||
|
|
Loading…
Reference in New Issue