From 890b49fb5d310e0e620d117474ea62198f291b4d Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 29 Jun 2023 14:08:58 +0200 Subject: [PATCH] optimized some dedup functions --- .../pace/common/AbstractPaceFunctions.java | 9 ++-- .../eu/dnetlib/pace/tree/JsonListMatch.java | 8 ++- .../dhp/oa/dedup/SparkCreateSimRels.java | 51 ++++++++++--------- 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java index 06a955ba5..f21b4d5b3 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java @@ -49,14 +49,14 @@ public abstract class AbstractPaceFunctions { protected static Set ngramBlacklist = loadFromClasspath("/eu/dnetlib/pace/config/ngram_blacklist.txt"); // html regex for normalization - public final String HTML_REGEX = "<[^>]*>"; + public final Pattern HTML_REGEX = Pattern.compile("<[^>]*>"); private static final String alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 "; private static final String aliases_from = "⁰¹²³⁴⁵⁶⁷⁸⁹⁺⁻⁼⁽⁾ⁿ₀₁₂₃₄₅₆₇₈₉₊₋₌₍₎àáâäæãåāèéêëēėęəîïíīįìôöòóœøōõûüùúūßśšłžźżçćčñń"; private static final String aliases_to = "0123456789+-=()n0123456789+-=()aaaaaaaaeeeeeeeeiiiiiioooooooouuuuussslzzzcccnn"; // doi prefix for normalization - public final String DOI_PREFIX = "(https?:\\/\\/dx\\.doi\\.org\\/)|(doi:)"; + public final Pattern DOI_PREFIX = Pattern.compile("(https?:\\/\\/dx\\.doi\\.org\\/)|(doi:)"); private Pattern numberPattern = Pattern.compile("-?\\d+(\\.\\d+)?"); @@ -67,8 +67,7 @@ public abstract class AbstractPaceFunctions { } protected String cleanup(final String s) { - - final String s1 = s.replaceAll(HTML_REGEX, ""); + final String s1 = HTML_REGEX.matcher(s).replaceAll( ""); final String s2 = unicodeNormalization(s1.toLowerCase()); final String s3 = nfd(s2); final String s4 = fixXML(s3); @@ -302,7 +301,7 @@ public abstract class AbstractPaceFunctions { } public String normalizePid(String pid) { - return pid.toLowerCase().replaceAll(DOI_PREFIX, ""); + return DOI_PREFIX.matcher(pid.toLowerCase()).replaceAll(""); } // get the list of keywords into the input string diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JsonListMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JsonListMatch.java index 43fbfb07d..1f6b76fe6 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JsonListMatch.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JsonListMatch.java @@ -6,6 +6,9 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.Option; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -15,6 +18,7 @@ import eu.dnetlib.pace.config.Config; import eu.dnetlib.pace.tree.support.AbstractListComparator; import eu.dnetlib.pace.tree.support.ComparatorClass; import eu.dnetlib.pace.util.MapDocumentUtil; +import com.jayway.jsonpath.JsonPath; @ComparatorClass("jsonListMatch") public class JsonListMatch extends AbstractListComparator { @@ -59,11 +63,11 @@ public class JsonListMatch extends AbstractListComparator { StringBuilder st = new StringBuilder(); // to build the string used for comparisons basing on the jpath into // parameters - + final DocumentContext documentContext = JsonPath.using(Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS)).parse(json); // for each path in the param list for (String key : params.keySet().stream().filter(k -> k.contains("jpath")).collect(Collectors.toList())) { String path = params.get(key); - String value = MapDocumentUtil.getJPathString(path, json); + String value = MapDocumentUtil.getJPathString(path, documentContext); if (value == null || value.isEmpty()) value = ""; st.append(value); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index fa9bd39be..d9b71badd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -18,12 +18,12 @@ import org.xml.sax.SAXException; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.dedup.log.DedupLogModel; import eu.dnetlib.dhp.application.dedup.log.DedupLogWriter; -import eu.dnetlib.dhp.oa.dedup.model.SparkDedupConfig; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.SparkDedupConfig; public class SparkCreateSimRels extends AbstractSparkAction { @@ -36,21 +36,21 @@ public class SparkCreateSimRels extends AbstractSparkAction { public static void main(String[] args) throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); parser.parseArgument(args); SparkConf conf = new SparkConf(); new SparkCreateSimRels(parser, getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @Override public void run(ISLookUpService isLookUpService) - throws DocumentException, IOException, ISLookUpException, SAXException { + throws DocumentException, IOException, ISLookUpException, SAXException { // read oozie parameters final String graphBasePath = parser.get("graphBasePath"); @@ -58,15 +58,16 @@ public class SparkCreateSimRels extends AbstractSparkAction { final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); final int numPartitions = Optional - .ofNullable(parser.get("numPartitions")) - .map(Integer::valueOf) - .orElse(NUM_PARTITIONS); + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); log.info("numPartitions: '{}'", numPartitions); log.info("graphBasePath: '{}'", graphBasePath); log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); + final String dfLogPath = parser.get("dataframeLog"); final String runTag = Optional.ofNullable(parser.get("runTAG")).orElse("UNKNOWN"); @@ -86,25 +87,25 @@ public class SparkCreateSimRels extends AbstractSparkAction { SparkDedupConfig sparkConfig = new SparkDedupConfig(dedupConf, numPartitions); Dataset simRels = spark - .read() - .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .transform(sparkConfig.modelExtractor()) // Extract fields from input json column according to model - // definition - .transform(sparkConfig.generateAndProcessClustersWithJoins()) // generate pairs according to - // filters, clusters, and model - // definition - // .transform(sparkConfig.processClusters()) // process blocks and emits pairs of found - // similarities - .map( - (MapFunction) t -> DedupUtility - .createSimRel(t.getStruct(0).getString(0), t.getStruct(0).getString(1), entity), - Encoders.bean(Relation.class)); + .read() + .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .transform(sparkConfig.modelExtractor()) // Extract fields from input json column according to model + // definition + .transform(sparkConfig.generateClustersWithDFAPI()) // generate pairs according to + // filters, clusters, and model + // definition + .transform(sparkConfig.processClusters()) // process blocks and emits pairs of found + // similarities + .map( + (MapFunction) t -> DedupUtility + .createSimRel(t.getStruct(0).getString(0), t.getStruct(0).getString(1), entity), + Encoders.bean(Relation.class)); saveParquet(simRels, outputPath, SaveMode.Overwrite); final long end = System.currentTimeMillis(); if (StringUtils.isNotBlank(dfLogPath)) { final DedupLogModel model = new DedupLogModel(runTag, dedupConf.toString(), subEntity, start, end, - end - start); + end - start); new DedupLogWriter(dfLogPath).appendLog(model, spark); }