diff --git a/.gitignore b/.gitignore
index 73d9179fa..14cd4d345 100644
--- a/.gitignore
+++ b/.gitignore
@@ -26,3 +26,4 @@ spark-warehouse
/**/*.log
/**/.factorypath
/**/.scalafmt.conf
+/.java-version
diff --git a/README.md b/README.md
index 0a0bd82ab..2c1440f44 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,128 @@
# dnet-hadoop
-Dnet-hadoop is the project that defined all the OOZIE workflows for the OpenAIRE Graph construction, processing, provisioning.
\ No newline at end of file
+
+Dnet-hadoop is the project that defined all the [OOZIE workflows](https://oozie.apache.org/) for the OpenAIRE Graph construction, processing, provisioning.
+
+How to build, package and run oozie workflows
+====================
+
+Oozie-installer is a utility allowing building, uploading and running oozie workflows. In practice, it creates a `*.tar.gz`
+package that contains resources that define a workflow and some helper scripts.
+
+This module is automatically executed when running:
+
+`mvn package -Poozie-package -Dworkflow.source.dir=classpath/to/parent/directory/of/oozie_app`
+
+on module having set:
+
+```
+
+ eu.dnetlib.dhp
+ dhp-workflows
+
+```
+
+in `pom.xml` file. `oozie-package` profile initializes oozie workflow packaging, `workflow.source.dir` property points to
+a workflow (notice: this is not a relative path but a classpath to directory usually holding `oozie_app` subdirectory).
+
+The outcome of this packaging is `oozie-package.tar.gz` file containing inside all the resources required to run Oozie workflow:
+
+- jar packages
+- workflow definitions
+- job properties
+- maintenance scripts
+
+Required properties
+====================
+
+In order to include proper workflow within package, `workflow.source.dir` property has to be set. It could be provided
+by setting `-Dworkflow.source.dir=some/job/dir` maven parameter.
+
+In oder to define full set of cluster environment properties one should create `~/.dhp/application.properties` file with
+the following properties:
+
+- `dhp.hadoop.frontend.user.name` - your user name on hadoop cluster and frontend machine
+- `dhp.hadoop.frontend.host.name` - frontend host name
+- `dhp.hadoop.frontend.temp.dir` - frontend directory for temporary files
+- `dhp.hadoop.frontend.port.ssh` - frontend machine ssh port
+- `oozieServiceLoc` - oozie service location required by run_workflow.sh script executing oozie job
+- `nameNode` - name node address
+- `jobTracker` - job tracker address
+- `oozie.execution.log.file.location` - location of file that will be created when executing oozie job, it contains output
+produced by `run_workflow.sh` script (needed to obtain oozie job id)
+- `maven.executable` - mvn command location, requires parameterization due to a different setup of CI cluster
+- `sparkDriverMemory` - amount of memory assigned to spark jobs driver
+- `sparkExecutorMemory` - amount of memory assigned to spark jobs executors
+- `sparkExecutorCores` - number of cores assigned to spark jobs executors
+
+All values will be overriden with the ones from `job.properties` and eventually `job-override.properties` stored in module's
+main folder.
+
+When overriding properties from `job.properties`, `job-override.properties` file can be created in main module directory
+(the one containing `pom.xml` file) and define all new properties which will override existing properties.
+One can provide those properties one by one as command line `-D` arguments.
+
+Properties overriding order is the following:
+
+1. `pom.xml` defined properties (located in the project root dir)
+2. `~/.dhp/application.properties` defined properties
+3. `${workflow.source.dir}/job.properties`
+4. `job-override.properties` (located in the project root dir)
+5. `maven -Dparam=value`
+
+where the maven `-Dparam` property is overriding all the other ones.
+
+Workflow definition requirements
+====================
+
+`workflow.source.dir` property should point to the following directory structure:
+
+ [${workflow.source.dir}]
+ |
+ |-job.properties (optional)
+ |
+ \-[oozie_app]
+ |
+ \-workflow.xml
+
+This property can be set using maven `-D` switch.
+
+`[oozie_app]` is the default directory name however it can be set to any value as soon as `oozieAppDir` property is
+provided with directory name as value.
+
+Sub-workflows are supported as well and sub-workflow directories should be nested within `[oozie_app]` directory.
+
+Creating oozie installer step-by-step
+=====================================
+
+Automated oozie-installer steps are the following:
+
+1. creating jar packages: `*.jar` and `*tests.jar` along with copying all dependencies in `target/dependencies`
+2. reading properties from maven, `~/.dhp/application.properties`, `job.properties`, `job-override.properties`
+3. invoking priming mechanism linking resources from import.txt file (currently resolving subworkflow resources)
+4. assembling shell scripts for preparing Hadoop filesystem, uploading Oozie application and starting workflow
+5. copying whole `${workflow.source.dir}` content to `target/${oozie.package.file.name}`
+6. generating updated `job.properties` file in `target/${oozie.package.file.name}` based on maven,
+`~/.dhp/application.properties`, `job.properties` and `job-override.properties`
+7. creating `lib` directory (or multiple directories for sub-workflows for each nested directory) and copying jar packages
+created at step (1) to each one of them
+8. bundling whole `${oozie.package.file.name}` directory into single tar.gz package
+
+Uploading oozie package and running workflow on cluster
+=======================================================
+
+In order to simplify deployment and execution process two dedicated profiles were introduced:
+
+- `deploy`
+- `run`
+
+to be used along with `oozie-package` profile e.g. by providing `-Poozie-package,deploy,run` maven parameters.
+
+The `deploy` profile supplements packaging process with:
+1) uploading oozie-package via scp to `/home/${user.name}/oozie-packages` directory on `${dhp.hadoop.frontend.host.name}` machine
+2) extracting uploaded package
+3) uploading oozie content to hadoop cluster HDFS location defined in `oozie.wf.application.path` property (generated dynamically by maven build process, based on `${dhp.hadoop.frontend.user.name}` and `workflow.source.dir` properties)
+
+The `run` profile introduces:
+1) executing oozie application uploaded to HDFS cluster using `deploy` command. Triggers `run_workflow.sh` script providing runtime properties defined in `job.properties` file.
+
+Notice: ssh access to frontend machine has to be configured on system level and it is preferable to set key-based authentication in order to simplify remote operations.
\ No newline at end of file
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 6df11f4ea..6198bd81e 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -52,6 +52,8 @@
+ true
+ ${scala.binary.version}
${scala.version}
@@ -60,6 +62,11 @@
+
+ eu.dnetlib.dhp
+ dhp-pace-core
+ ${project.version}
+
org.apache.hadoop
@@ -76,11 +83,11 @@
org.apache.spark
- spark-core_2.11
+ spark-core_${scala.binary.version}
org.apache.spark
- spark-sql_2.11
+ spark-sql_${scala.binary.version}
@@ -142,11 +149,6 @@
okhttp
-
- eu.dnetlib
- dnet-pace-core
-
-
org.apache.httpcomponents
httpclient
@@ -159,7 +161,7 @@
eu.dnetlib.dhp
- dhp-schemas
+ ${dhp-schemas.artifact}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java
index 3f65d754f..cf0a183d7 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java
@@ -11,25 +11,18 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.OafEntity;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
public class DispatchEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
@@ -54,44 +47,51 @@ public class DispatchEntitiesSparkJob {
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
- String graphTableClassName = parser.get("graphTableClassName");
- log.info("graphTableClassName: {}", graphTableClassName);
-
- @SuppressWarnings("unchecked")
- Class extends OafEntity> entityClazz = (Class extends OafEntity>) Class.forName(graphTableClassName);
+ boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible"));
+ log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
- spark -> {
- HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
- dispatchEntities(spark, inputPath, entityClazz, outputPath);
- });
+ spark -> dispatchEntities(spark, inputPath, outputPath, filterInvisible));
}
- private static void dispatchEntities(
+ private static void dispatchEntities(
SparkSession spark,
String inputPath,
- Class clazz,
- String outputPath) {
+ String outputPath,
+ boolean filterInvisible) {
- spark
- .read()
- .textFile(inputPath)
- .filter((FilterFunction) s -> isEntityType(s, clazz))
- .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING())
- .map(
- (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz),
- Encoders.bean(clazz))
- .write()
- .mode(SaveMode.Overwrite)
- .option("compression", "gzip")
- .json(outputPath);
+ Dataset df = spark.read().textFile(inputPath);
+
+ ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> {
+ String entityType = entry.getKey();
+ Class> clazz = entry.getValue();
+
+ final String entityPath = outputPath + "/" + entityType;
+ if (!entityType.equalsIgnoreCase("relation")) {
+ HdfsSupport.remove(entityPath, spark.sparkContext().hadoopConfiguration());
+ Dataset entityDF = spark
+ .read()
+ .schema(Encoders.bean(clazz).schema())
+ .json(
+ df
+ .filter((FilterFunction) s -> s.startsWith(clazz.getName()))
+ .map(
+ (MapFunction) s -> StringUtils.substringAfter(s, "|"),
+ Encoders.STRING()));
+
+ if (filterInvisible) {
+ entityDF = entityDF.filter("dataInfo.invisible != true");
+ }
+
+ entityDF
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(entityPath);
+ }
+ });
}
-
- private static boolean isEntityType(final String s, final Class clazz) {
- return StringUtils.substringBefore(s, "|").equals(clazz.getName());
- }
-
}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRule.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRule.java
index d0f5a3b27..c0c451b88 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRule.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRule.java
@@ -7,7 +7,7 @@ import java.util.regex.Pattern;
// https://researchguides.stevens.edu/c.php?g=442331&p=6577176
public class PmidCleaningRule {
- public static final Pattern PATTERN = Pattern.compile("[1-9]{1,8}");
+ public static final Pattern PATTERN = Pattern.compile("0*(\\d{1,8})");
public static String clean(String pmid) {
String s = pmid
@@ -17,7 +17,7 @@ public class PmidCleaningRule {
final Matcher m = PATTERN.matcher(s);
if (m.find()) {
- return m.group();
+ return m.group(1);
}
return "";
}
diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json
index aa8d2a7c2..60f11ac84 100644
--- a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json
+++ b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json
@@ -18,9 +18,9 @@
"paramRequired": true
},
{
- "paramName": "c",
- "paramLongName": "graphTableClassName",
- "paramDescription": "the graph entity class name",
+ "paramName": "fi",
+ "paramLongName": "filterInvisible",
+ "paramDescription": "if true filters out invisible entities",
"paramRequired": true
}
]
\ No newline at end of file
diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogModel.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogModel.scala
new file mode 100644
index 000000000..d74ec3f69
--- /dev/null
+++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogModel.scala
@@ -0,0 +1,10 @@
+package eu.dnetlib.dhp.application.dedup.log
+
+case class DedupLogModel(
+ tag: String,
+ configuration: String,
+ entity: String,
+ startTS: Long,
+ endTS: Long,
+ totalMs: Long
+) {}
diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogWriter.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogWriter.scala
new file mode 100644
index 000000000..4409c01d9
--- /dev/null
+++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogWriter.scala
@@ -0,0 +1,14 @@
+package eu.dnetlib.dhp.application.dedup.log
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+
+class DedupLogWriter(path: String) {
+
+ def appendLog(dedupLogModel: DedupLogModel, spark: SparkSession): Unit = {
+ import spark.implicits._
+ val df = spark.createDataset[DedupLogModel](data = List(dedupLogModel))
+ df.write.mode(SaveMode.Append).save(path)
+
+ }
+
+}
diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/MdStoreClientTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/MdStoreClientTest.java
index 98f0a3cd1..f87f6e313 100644
--- a/dhp-common/src/test/java/eu/dnetlib/dhp/common/MdStoreClientTest.java
+++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/MdStoreClientTest.java
@@ -8,7 +8,6 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -16,8 +15,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class MdStoreClientTest {
- @Test
- @Disabled
+ // @Test
public void testMongoCollection() throws IOException {
final MdstoreClient client = new MdstoreClient("mongodb://localhost:27017", "mdstore");
diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRuleTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRuleTest.java
index 9562adf7e..295eac85f 100644
--- a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRuleTest.java
+++ b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRuleTest.java
@@ -9,10 +9,16 @@ class PmidCleaningRuleTest {
@Test
void testCleaning() {
+ // leading zeros are removed
assertEquals("1234", PmidCleaningRule.clean("01234"));
+ // tolerant to spaces in the middle
assertEquals("1234567", PmidCleaningRule.clean("0123 4567"));
+ // stop parsing at first not numerical char
assertEquals("123", PmidCleaningRule.clean("0123x4567"));
+ // invalid id leading to empty result
assertEquals("", PmidCleaningRule.clean("abc"));
+ // valid id with zeroes in the number
+ assertEquals("20794075", PmidCleaningRule.clean("20794075"));
}
}
diff --git a/dhp-pace-core/pom.xml b/dhp-pace-core/pom.xml
new file mode 100644
index 000000000..fd7f44fc9
--- /dev/null
+++ b/dhp-pace-core/pom.xml
@@ -0,0 +1,110 @@
+
+
+
+ 4.0.0
+
+
+ eu.dnetlib.dhp
+ dhp
+ 1.2.5-SNAPSHOT
+ ../pom.xml
+
+
+ eu.dnetlib.dhp
+ dhp-pace-core
+ 1.2.5-SNAPSHOT
+ jar
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ ${net.alchim31.maven.version}
+
+
+ scala-compile-first
+ initialize
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+ true
+ ${scala.binary.version}
+ ${scala.version}
+
+
+
+
+
+
+
+
+ edu.cmu
+ secondstring
+
+
+ com.google.guava
+ guava
+
+
+ com.google.code.gson
+ gson
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ commons-io
+ commons-io
+
+
+ org.antlr
+ stringtemplate
+
+
+ commons-logging
+ commons-logging
+
+
+ org.reflections
+ reflections
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.commons
+ commons-math3
+
+
+ com.jayway.jsonpath
+ json-path
+
+
+ com.ibm.icu
+ icu4j
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+
+
+
+
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/AbstractClusteringFunction.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/AbstractClusteringFunction.java
new file mode 100644
index 000000000..3da8eb490
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/AbstractClusteringFunction.java
@@ -0,0 +1,46 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+
+import eu.dnetlib.pace.common.AbstractPaceFunctions;
+import eu.dnetlib.pace.config.Config;
+
+public abstract class AbstractClusteringFunction extends AbstractPaceFunctions implements ClusteringFunction {
+
+ protected Map params;
+
+ public AbstractClusteringFunction(final Map params) {
+ this.params = params;
+ }
+
+ protected abstract Collection doApply(Config conf, String s);
+
+ @Override
+ public Collection apply(Config conf, List fields) {
+ return fields
+ .stream()
+ .filter(f -> !f.isEmpty())
+ .map(this::normalize)
+ .map(s -> filterAllStopWords(s))
+ .map(s -> doApply(conf, s))
+ .map(c -> filterBlacklisted(c, ngramBlacklist))
+ .flatMap(c -> c.stream())
+ .filter(StringUtils::isNotBlank)
+ .collect(Collectors.toCollection(HashSet::new));
+ }
+
+ public Map getParams() {
+ return params;
+ }
+
+ protected Integer param(String name) {
+ return params.get(name);
+ }
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Acronyms.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Acronyms.java
new file mode 100644
index 000000000..9072fbb4b
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Acronyms.java
@@ -0,0 +1,51 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import com.google.common.collect.Sets;
+
+import eu.dnetlib.pace.config.Config;
+
+@ClusteringClass("acronyms")
+public class Acronyms extends AbstractClusteringFunction {
+
+ public Acronyms(Map params) {
+ super(params);
+ }
+
+ @Override
+ protected Collection doApply(Config conf, String s) {
+ return extractAcronyms(s, param("max"), param("minLen"), param("maxLen"));
+ }
+
+ private Set extractAcronyms(final String s, int maxAcronyms, int minLen, int maxLen) {
+
+ final Set acronyms = Sets.newLinkedHashSet();
+
+ for (int i = 0; i < maxAcronyms; i++) {
+
+ final StringTokenizer st = new StringTokenizer(s);
+ final StringBuilder sb = new StringBuilder();
+
+ while (st.hasMoreTokens()) {
+ final String token = st.nextToken();
+ if (sb.length() > maxLen) {
+ break;
+ }
+ if (token.length() > 1 && i < token.length()) {
+ sb.append(token.charAt(i));
+ }
+ }
+ String acronym = sb.toString();
+ if (acronym.length() > minLen) {
+ acronyms.add(acronym);
+ }
+ }
+ return acronyms;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ClusteringClass.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ClusteringClass.java
new file mode 100644
index 000000000..3bb845b15
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ClusteringClass.java
@@ -0,0 +1,14 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface ClusteringClass {
+
+ public String value();
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ClusteringFunction.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ClusteringFunction.java
new file mode 100644
index 000000000..8b7852418
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ClusteringFunction.java
@@ -0,0 +1,16 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import eu.dnetlib.pace.config.Config;
+
+public interface ClusteringFunction {
+
+ public Collection apply(Config config, List fields);
+
+ public Map getParams();
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ImmutableFieldValue.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ImmutableFieldValue.java
new file mode 100644
index 000000000..bc8844aee
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ImmutableFieldValue.java
@@ -0,0 +1,28 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+
+import eu.dnetlib.pace.config.Config;
+
+@ClusteringClass("immutablefieldvalue")
+public class ImmutableFieldValue extends AbstractClusteringFunction {
+
+ public ImmutableFieldValue(final Map params) {
+ super(params);
+ }
+
+ @Override
+ protected Collection doApply(final Config conf, final String s) {
+ final List res = Lists.newArrayList();
+
+ res.add(s);
+
+ return res;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/KeywordsClustering.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/KeywordsClustering.java
new file mode 100644
index 000000000..38299adb4
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/KeywordsClustering.java
@@ -0,0 +1,54 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+
+import eu.dnetlib.pace.config.Config;
+
+@ClusteringClass("keywordsclustering")
+public class KeywordsClustering extends AbstractClusteringFunction {
+
+ public KeywordsClustering(Map params) {
+ super(params);
+ }
+
+ @Override
+ protected Collection doApply(final Config conf, String s) {
+
+ // takes city codes and keywords codes without duplicates
+ Set keywords = getKeywords(s, conf.translationMap(), params.getOrDefault("windowSize", 4));
+ Set cities = getCities(s, params.getOrDefault("windowSize", 4));
+
+ // list of combination to return as result
+ final Collection combinations = new LinkedHashSet();
+
+ for (String keyword : keywordsToCodes(keywords, conf.translationMap())) {
+ for (String city : citiesToCodes(cities)) {
+ combinations.add(keyword + "-" + city);
+ if (combinations.size() >= params.getOrDefault("max", 2)) {
+ return combinations;
+ }
+ }
+ }
+
+ return combinations;
+ }
+
+ @Override
+ public Collection apply(final Config conf, List fields) {
+ return fields
+ .stream()
+ .filter(f -> !f.isEmpty())
+ .map(this::cleanup)
+ .map(this::normalize)
+ .map(s -> filterAllStopWords(s))
+ .map(s -> doApply(conf, s))
+ .map(c -> filterBlacklisted(c, ngramBlacklist))
+ .flatMap(c -> c.stream())
+ .filter(StringUtils::isNotBlank)
+ .collect(Collectors.toCollection(HashSet::new));
+ }
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LastNameFirstInitial.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LastNameFirstInitial.java
new file mode 100644
index 000000000..5a385961a
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LastNameFirstInitial.java
@@ -0,0 +1,79 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.collect.Lists;
+
+import eu.dnetlib.pace.config.Config;
+import eu.dnetlib.pace.model.Person;
+
+@ClusteringClass("lnfi")
+public class LastNameFirstInitial extends AbstractClusteringFunction {
+
+ private boolean DEFAULT_AGGRESSIVE = true;
+
+ public LastNameFirstInitial(final Map params) {
+ super(params);
+ }
+
+ @Override
+ public Collection apply(Config conf, List fields) {
+ return fields
+ .stream()
+ .filter(f -> !f.isEmpty())
+ .map(this::normalize)
+ .map(s -> doApply(conf, s))
+ .map(c -> filterBlacklisted(c, ngramBlacklist))
+ .flatMap(c -> c.stream())
+ .filter(StringUtils::isNotBlank)
+ .collect(Collectors.toCollection(HashSet::new));
+ }
+
+ @Override
+ protected String normalize(final String s) {
+ return fixAliases(transliterate(nfd(unicodeNormalization(s))))
+ // do not compact the regexes in a single expression, would cause StackOverflowError in case of large input
+ // strings
+ .replaceAll("[^ \\w]+", "")
+ .replaceAll("(\\p{InCombiningDiacriticalMarks})+", "")
+ .replaceAll("(\\p{Punct})+", " ")
+ .replaceAll("(\\d)+", " ")
+ .replaceAll("(\\n)+", " ")
+ .trim();
+ }
+
+ @Override
+ protected Collection doApply(final Config conf, final String s) {
+
+ final List res = Lists.newArrayList();
+
+ final boolean aggressive = (Boolean) (getParams().containsKey("aggressive") ? getParams().get("aggressive")
+ : DEFAULT_AGGRESSIVE);
+
+ Person p = new Person(s, aggressive);
+
+ if (p.isAccurate()) {
+ String lastName = p.getNormalisedSurname().toLowerCase();
+ String firstInitial = p.getNormalisedFirstName().toLowerCase().substring(0, 1);
+
+ res.add(firstInitial.concat(lastName));
+ } else { // is not accurate, meaning it has no defined name and surname
+ List fullname = Arrays.asList(p.getNormalisedFullname().split(" "));
+ if (fullname.size() == 1) {
+ res.add(p.getNormalisedFullname().toLowerCase());
+ } else if (fullname.size() == 2) {
+ res.add(fullname.get(0).substring(0, 1).concat(fullname.get(1)).toLowerCase());
+ res.add(fullname.get(1).substring(0, 1).concat(fullname.get(0)).toLowerCase());
+ } else {
+ res.add(fullname.get(0).substring(0, 1).concat(fullname.get(fullname.size() - 1)).toLowerCase());
+ res.add(fullname.get(fullname.size() - 1).substring(0, 1).concat(fullname.get(0)).toLowerCase());
+ }
+ }
+
+ return res;
+ }
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LowercaseClustering.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LowercaseClustering.java
new file mode 100644
index 000000000..a3a6c4881
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LowercaseClustering.java
@@ -0,0 +1,38 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import eu.dnetlib.pace.config.Config;
+
+@ClusteringClass("lowercase")
+public class LowercaseClustering extends AbstractClusteringFunction {
+
+ public LowercaseClustering(final Map params) {
+ super(params);
+ }
+
+ @Override
+ public Collection apply(Config conf, List fields) {
+ Collection c = Sets.newLinkedHashSet();
+ for (String f : fields) {
+ c.addAll(doApply(conf, f));
+ }
+ return c;
+ }
+
+ @Override
+ protected Collection doApply(final Config conf, final String s) {
+ if (StringUtils.isBlank(s)) {
+ return Lists.newArrayList();
+ }
+ return Lists.newArrayList(s.toLowerCase().trim());
+ }
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NGramUtils.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NGramUtils.java
new file mode 100644
index 000000000..6ee80b86e
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NGramUtils.java
@@ -0,0 +1,24 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+
+import eu.dnetlib.pace.common.AbstractPaceFunctions;
+
+public class NGramUtils extends AbstractPaceFunctions {
+ static private final NGramUtils NGRAMUTILS = new NGramUtils();
+
+ private static final int SIZE = 100;
+
+ private static final Set stopwords = AbstractPaceFunctions
+ .loadFromClasspath("/eu/dnetlib/pace/config/stopwords_en.txt");
+
+ public static String cleanupForOrdering(String s) {
+ return (NGRAMUTILS.filterStopWords(NGRAMUTILS.normalize(s), stopwords) + StringUtils.repeat(" ", SIZE))
+ .substring(0, SIZE)
+ .replaceAll(" ", "");
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NgramPairs.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NgramPairs.java
new file mode 100644
index 000000000..aa06aa408
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NgramPairs.java
@@ -0,0 +1,41 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+
+import eu.dnetlib.pace.config.Config;
+
+@ClusteringClass("ngrampairs")
+public class NgramPairs extends Ngrams {
+
+ public NgramPairs(Map params) {
+ super(params, false);
+ }
+
+ public NgramPairs(Map params, boolean sorted) {
+ super(params, sorted);
+ }
+
+ @Override
+ protected Collection doApply(Config conf, String s) {
+ return ngramPairs(Lists.newArrayList(getNgrams(s, param("ngramLen"), param("max") * 2, 1, 2)), param("max"));
+ }
+
+ protected Collection ngramPairs(final List ngrams, int maxNgrams) {
+ Collection res = Lists.newArrayList();
+ int j = 0;
+ for (int i = 0; i < ngrams.size() && res.size() < maxNgrams; i++) {
+ if (++j >= ngrams.size()) {
+ break;
+ }
+ res.add(ngrams.get(i) + ngrams.get(j));
+ // System.out.println("-- " + concatNgrams);
+ }
+ return res;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Ngrams.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Ngrams.java
new file mode 100644
index 000000000..96c305a16
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Ngrams.java
@@ -0,0 +1,52 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.*;
+
+import eu.dnetlib.pace.config.Config;
+
+@ClusteringClass("ngrams")
+public class Ngrams extends AbstractClusteringFunction {
+
+ private final boolean sorted;
+
+ public Ngrams(Map params) {
+ this(params, false);
+ }
+
+ public Ngrams(Map params, boolean sorted) {
+ super(params);
+ this.sorted = sorted;
+ }
+
+ @Override
+ protected Collection doApply(Config conf, String s) {
+ return getNgrams(s, param("ngramLen"), param("max"), param("maxPerToken"), param("minNgramLen"));
+ }
+
+ protected Collection getNgrams(String s, int ngramLen, int max, int maxPerToken, int minNgramLen) {
+
+ final Collection ngrams = sorted ? new TreeSet<>() : new LinkedHashSet();
+ final StringTokenizer st = new StringTokenizer(s);
+
+ while (st.hasMoreTokens()) {
+ final String token = st.nextToken();
+ if (!token.isEmpty()) {
+ for (int i = 0; i < maxPerToken && ngramLen + i <= token.length(); i++) {
+ String ngram = token.substring(i, Math.min(ngramLen + i, token.length())).trim();
+
+ if (ngram.length() >= minNgramLen) {
+ ngrams.add(ngram);
+
+ if (ngrams.size() >= max) {
+ return ngrams;
+ }
+ }
+ }
+ }
+ }
+ // System.out.println(ngrams + " n: " + ngrams.size());
+ return ngrams;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonClustering.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonClustering.java
new file mode 100644
index 000000000..b4a04ce65
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonClustering.java
@@ -0,0 +1,84 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.collect.Sets;
+
+import eu.dnetlib.pace.common.AbstractPaceFunctions;
+import eu.dnetlib.pace.config.Config;
+import eu.dnetlib.pace.model.Person;
+
+@ClusteringClass("personClustering")
+public class PersonClustering extends AbstractPaceFunctions implements ClusteringFunction {
+
+ private Map params;
+
+ private static final int MAX_TOKENS = 5;
+
+ public PersonClustering(final Map params) {
+ this.params = params;
+ }
+
+ @Override
+ public Collection apply(final Config conf, final List fields) {
+ final Set hashes = Sets.newHashSet();
+
+ for (final String f : fields) {
+
+ final Person person = new Person(f, false);
+
+ if (StringUtils.isNotBlank(person.getNormalisedFirstName())
+ && StringUtils.isNotBlank(person.getNormalisedSurname())) {
+ hashes.add(firstLC(person.getNormalisedFirstName()) + person.getNormalisedSurname().toLowerCase());
+ } else {
+ for (final String token1 : tokens(f, MAX_TOKENS)) {
+ for (final String token2 : tokens(f, MAX_TOKENS)) {
+ if (!token1.equals(token2)) {
+ hashes.add(firstLC(token1) + token2);
+ }
+ }
+ }
+ }
+ }
+
+ return hashes;
+ }
+
+// @Override
+// public Collection apply(final List fields) {
+// final Set hashes = Sets.newHashSet();
+//
+// for (final Field f : fields) {
+//
+// final GTAuthor gta = GTAuthor.fromOafJson(f.stringValue());
+//
+// final Author a = gta.getAuthor();
+//
+// if (StringUtils.isNotBlank(a.getFirstname()) && StringUtils.isNotBlank(a.getSecondnames())) {
+// hashes.add(firstLC(a.getFirstname()) + a.getSecondnames().toLowerCase());
+// } else {
+// for (final String token1 : tokens(f.stringValue(), MAX_TOKENS)) {
+// for (final String token2 : tokens(f.stringValue(), MAX_TOKENS)) {
+// if (!token1.equals(token2)) {
+// hashes.add(firstLC(token1) + token2);
+// }
+// }
+// }
+// }
+// }
+//
+// return hashes;
+// }
+
+ @Override
+ public Map getParams() {
+ return params;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonHash.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonHash.java
new file mode 100644
index 000000000..a3d58a9be
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonHash.java
@@ -0,0 +1,34 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+
+import eu.dnetlib.pace.config.Config;
+import eu.dnetlib.pace.model.Person;
+
+@ClusteringClass("personHash")
+public class PersonHash extends AbstractClusteringFunction {
+
+ private boolean DEFAULT_AGGRESSIVE = false;
+
+ public PersonHash(final Map params) {
+ super(params);
+ }
+
+ @Override
+ protected Collection doApply(final Config conf, final String s) {
+ final List res = Lists.newArrayList();
+
+ final boolean aggressive = (Boolean) (getParams().containsKey("aggressive") ? getParams().get("aggressive")
+ : DEFAULT_AGGRESSIVE);
+
+ res.add(new Person(s, aggressive).hash());
+
+ return res;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/RandomClusteringFunction.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/RandomClusteringFunction.java
new file mode 100644
index 000000000..2aab926da
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/RandomClusteringFunction.java
@@ -0,0 +1,20 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Collection;
+import java.util.Map;
+
+import eu.dnetlib.pace.config.Config;
+
+public class RandomClusteringFunction extends AbstractClusteringFunction {
+
+ public RandomClusteringFunction(Map params) {
+ super(params);
+ }
+
+ @Override
+ protected Collection doApply(final Config conf, String s) {
+ return null;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SortedNgramPairs.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SortedNgramPairs.java
new file mode 100644
index 000000000..b085ae26d
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SortedNgramPairs.java
@@ -0,0 +1,31 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.*;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+import eu.dnetlib.pace.config.Config;
+
+@ClusteringClass("sortedngrampairs")
+public class SortedNgramPairs extends NgramPairs {
+
+ public SortedNgramPairs(Map params) {
+ super(params, false);
+ }
+
+ @Override
+ protected Collection doApply(Config conf, String s) {
+
+ final List tokens = Lists.newArrayList(Splitter.on(" ").omitEmptyStrings().trimResults().split(s));
+
+ Collections.sort(tokens);
+
+ return ngramPairs(
+ Lists.newArrayList(getNgrams(Joiner.on(" ").join(tokens), param("ngramLen"), param("max") * 2, 1, 2)),
+ param("max"));
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SpaceTrimmingFieldValue.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SpaceTrimmingFieldValue.java
new file mode 100644
index 000000000..392aecc79
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SpaceTrimmingFieldValue.java
@@ -0,0 +1,34 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.collect.Lists;
+
+import eu.dnetlib.pace.config.Config;
+
+@ClusteringClass("spacetrimmingfieldvalue")
+public class SpaceTrimmingFieldValue extends AbstractClusteringFunction {
+
+ public SpaceTrimmingFieldValue(final Map params) {
+ super(params);
+ }
+
+ @Override
+ protected Collection doApply(final Config conf, final String s) {
+ final List res = Lists.newArrayList();
+
+ res
+ .add(
+ StringUtils.isBlank(s) ? RandomStringUtils.random(getParams().get("randomLength"))
+ : s.toLowerCase().replaceAll("\\s+", ""));
+
+ return res;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SuffixPrefix.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SuffixPrefix.java
new file mode 100644
index 000000000..2a1c023a9
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SuffixPrefix.java
@@ -0,0 +1,42 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+import eu.dnetlib.pace.config.Config;
+
+@ClusteringClass("suffixprefix")
+public class SuffixPrefix extends AbstractClusteringFunction {
+
+ public SuffixPrefix(Map params) {
+ super(params);
+ }
+
+ @Override
+ protected Collection doApply(Config conf, String s) {
+ return suffixPrefix(s, param("len"), param("max"));
+ }
+
+ private Collection suffixPrefix(String s, int len, int max) {
+ final Set bigrams = Sets.newLinkedHashSet();
+ int i = 0;
+ while (++i < s.length() && bigrams.size() < max) {
+ int j = s.indexOf(" ", i);
+
+ int offset = j + len + 1 < s.length() ? j + len + 1 : s.length();
+
+ if (j - len > 0) {
+ String bigram = s.substring(j - len, offset).replaceAll(" ", "").trim();
+ if (bigram.length() >= 4) {
+ bigrams.add(bigram);
+ }
+ }
+ }
+ return bigrams;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/UrlClustering.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/UrlClustering.java
new file mode 100644
index 000000000..5b267ad10
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/UrlClustering.java
@@ -0,0 +1,52 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import eu.dnetlib.pace.common.AbstractPaceFunctions;
+import eu.dnetlib.pace.config.Config;
+
+@ClusteringClass("urlclustering")
+public class UrlClustering extends AbstractPaceFunctions implements ClusteringFunction {
+
+ protected Map params;
+
+ public UrlClustering(final Map params) {
+ this.params = params;
+ }
+
+ @Override
+ public Collection apply(final Config conf, List fields) {
+ try {
+ return fields
+ .stream()
+ .filter(f -> !f.isEmpty())
+ .map(this::asUrl)
+ .map(URL::getHost)
+ .collect(Collectors.toCollection(HashSet::new));
+ } catch (IllegalStateException e) {
+ return new HashSet<>();
+ }
+ }
+
+ @Override
+ public Map getParams() {
+ return null;
+ }
+
+ private URL asUrl(String value) {
+ try {
+ return new URL(value);
+ } catch (MalformedURLException e) {
+ // should not happen as checked by pace typing
+ throw new IllegalStateException("invalid URL: " + value);
+ }
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java
new file mode 100644
index 000000000..c8e02f8f0
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java
@@ -0,0 +1,91 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Sets;
+
+import eu.dnetlib.pace.config.Config;
+
+@ClusteringClass("wordsStatsSuffixPrefixChain")
+public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction {
+
+ public WordsStatsSuffixPrefixChain(Map params) {
+ super(params);
+ }
+
+ @Override
+ protected Collection doApply(Config conf, String s) {
+ return suffixPrefixChain(s, param("mod"));
+ }
+
+ private Collection suffixPrefixChain(String s, int mod) {
+
+ // create the list of words from the string (remove short words)
+ List wordsList = Arrays
+ .stream(s.split(" "))
+ .filter(si -> si.length() > 3)
+ .collect(Collectors.toList());
+
+ final int words = wordsList.size();
+ final int letters = s.length();
+
+ // create the prefix: number of words + number of letters/mod
+ String prefix = words + "-" + letters / mod + "-";
+
+ return doSuffixPrefixChain(wordsList, prefix);
+
+ }
+
+ private Collection doSuffixPrefixChain(List wordsList, String prefix) {
+
+ Set set = Sets.newLinkedHashSet();
+ switch (wordsList.size()) {
+ case 0:
+ case 1:
+ break;
+ case 2:
+ set
+ .add(
+ prefix +
+ suffix(wordsList.get(0), 3) +
+ prefix(wordsList.get(1), 3));
+
+ set
+ .add(
+ prefix +
+ prefix(wordsList.get(0), 3) +
+ suffix(wordsList.get(1), 3));
+
+ break;
+ default:
+ set
+ .add(
+ prefix +
+ suffix(wordsList.get(0), 3) +
+ prefix(wordsList.get(1), 3) +
+ suffix(wordsList.get(2), 3));
+
+ set
+ .add(
+ prefix +
+ prefix(wordsList.get(0), 3) +
+ suffix(wordsList.get(1), 3) +
+ prefix(wordsList.get(2), 3));
+ break;
+ }
+
+ return set;
+
+ }
+
+ private String suffix(String s, int len) {
+ return s.substring(s.length() - len);
+ }
+
+ private String prefix(String s, int len) {
+ return s.substring(0, len);
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsSuffixPrefix.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsSuffixPrefix.java
new file mode 100644
index 000000000..e606590a5
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsSuffixPrefix.java
@@ -0,0 +1,59 @@
+
+package eu.dnetlib.pace.clustering;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+import eu.dnetlib.pace.config.Config;
+
+@ClusteringClass("wordssuffixprefix")
+public class WordsSuffixPrefix extends AbstractClusteringFunction {
+
+ public WordsSuffixPrefix(Map params) {
+ super(params);
+ }
+
+ @Override
+ protected Collection doApply(Config conf, String s) {
+ return suffixPrefix(s, param("len"), param("max"));
+ }
+
+ private Collection suffixPrefix(String s, int len, int max) {
+
+ final int words = s.split(" ").length;
+
+ // adjust the token length according to the number of words
+ switch (words) {
+ case 1:
+ return Sets.newLinkedHashSet();
+ case 2:
+ return doSuffixPrefix(s, len + 2, max, words);
+ case 3:
+ return doSuffixPrefix(s, len + 1, max, words);
+ default:
+ return doSuffixPrefix(s, len, max, words);
+ }
+ }
+
+ private Collection doSuffixPrefix(String s, int len, int max, int words) {
+ final Set bigrams = Sets.newLinkedHashSet();
+ int i = 0;
+ while (++i < s.length() && bigrams.size() < max) {
+ int j = s.indexOf(" ", i);
+
+ int offset = j + len + 1 < s.length() ? j + len + 1 : s.length();
+
+ if (j - len > 0) {
+ String bigram = s.substring(j - len, offset).replaceAll(" ", "").trim();
+ if (bigram.length() >= 4) {
+ bigrams.add(words + bigram);
+ }
+ }
+ }
+ return bigrams;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java
new file mode 100644
index 000000000..b440686de
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java
@@ -0,0 +1,357 @@
+
+package eu.dnetlib.pace.common;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.text.Normalizer;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.ibm.icu.text.Transliterator;
+
+import eu.dnetlib.pace.clustering.NGramUtils;
+
+/**
+ * Set of common functions for the framework
+ *
+ * @author claudio
+ */
+public abstract class AbstractPaceFunctions {
+
+ // city map to be used when translating the city names into codes
+ private static Map cityMap = AbstractPaceFunctions
+ .loadMapFromClasspath("/eu/dnetlib/pace/config/city_map.csv");
+
+ // list of stopwords in different languages
+ protected static Set stopwords_gr = loadFromClasspath("/eu/dnetlib/pace/config/stopwords_gr.txt");
+ protected static Set stopwords_en = loadFromClasspath("/eu/dnetlib/pace/config/stopwords_en.txt");
+ protected static Set stopwords_de = loadFromClasspath("/eu/dnetlib/pace/config/stopwords_de.txt");
+ protected static Set stopwords_es = loadFromClasspath("/eu/dnetlib/pace/config/stopwords_es.txt");
+ protected static Set stopwords_fr = loadFromClasspath("/eu/dnetlib/pace/config/stopwords_fr.txt");
+ protected static Set stopwords_it = loadFromClasspath("/eu/dnetlib/pace/config/stopwords_it.txt");
+ protected static Set stopwords_pt = loadFromClasspath("/eu/dnetlib/pace/config/stopwords_pt.txt");
+
+ // transliterator
+ protected static Transliterator transliterator = Transliterator.getInstance("Any-Eng");
+
+ // blacklist of ngrams: to avoid generic keys
+ protected static Set ngramBlacklist = loadFromClasspath("/eu/dnetlib/pace/config/ngram_blacklist.txt");
+
+ // html regex for normalization
+ public static final Pattern HTML_REGEX = Pattern.compile("<[^>]*>");
+
+ private static final String alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 ";
+ private static final String aliases_from = "⁰¹²³⁴⁵⁶⁷⁸⁹⁺⁻⁼⁽⁾ⁿ₀₁₂₃₄₅₆₇₈₉₊₋₌₍₎àáâäæãåāèéêëēėęəîïíīįìôöòóœøōõûüùúūßśšłžźżçćčñń";
+ private static final String aliases_to = "0123456789+-=()n0123456789+-=()aaaaaaaaeeeeeeeeiiiiiioooooooouuuuussslzzzcccnn";
+
+ // doi prefix for normalization
+ public static final Pattern DOI_PREFIX = Pattern.compile("(https?:\\/\\/dx\\.doi\\.org\\/)|(doi:)");
+
+ private static Pattern numberPattern = Pattern.compile("-?\\d+(\\.\\d+)?");
+
+ private static Pattern hexUnicodePattern = Pattern.compile("\\\\u(\\p{XDigit}{4})");
+
+ protected String concat(final List l) {
+ return Joiner.on(" ").skipNulls().join(l);
+ }
+
+ protected String cleanup(final String s) {
+ final String s1 = HTML_REGEX.matcher(s).replaceAll("");
+ final String s2 = unicodeNormalization(s1.toLowerCase());
+ final String s3 = nfd(s2);
+ final String s4 = fixXML(s3);
+ final String s5 = s4.replaceAll("([0-9]+)", " $1 ");
+ final String s6 = transliterate(s5);
+ final String s7 = fixAliases(s6);
+ final String s8 = s7.replaceAll("[^\\p{ASCII}]", "");
+ final String s9 = s8.replaceAll("[\\p{Punct}]", " ");
+ final String s10 = s9.replaceAll("\\n", " ");
+ final String s11 = s10.replaceAll("(?m)\\s+", " ");
+ final String s12 = s11.trim();
+ return s12;
+ }
+
+ protected String fixXML(final String a) {
+
+ return a
+ .replaceAll("–", " ")
+ .replaceAll("&", " ")
+ .replaceAll(""", " ")
+ .replaceAll("−", " ");
+ }
+
+ protected boolean checkNumbers(final String a, final String b) {
+ final String numbersA = getNumbers(a);
+ final String numbersB = getNumbers(b);
+ final String romansA = getRomans(a);
+ final String romansB = getRomans(b);
+ return !numbersA.equals(numbersB) || !romansA.equals(romansB);
+ }
+
+ protected String getRomans(final String s) {
+ final StringBuilder sb = new StringBuilder();
+ for (final String t : s.split(" ")) {
+ sb.append(isRoman(t) ? t : "");
+ }
+ return sb.toString();
+ }
+
+ protected boolean isRoman(final String s) {
+ return s
+ .replaceAll("^M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})$", "qwertyuiop")
+ .equals("qwertyuiop");
+ }
+
+ protected String getNumbers(final String s) {
+ final StringBuilder sb = new StringBuilder();
+ for (final String t : s.split(" ")) {
+ sb.append(isNumber(t) ? t : "");
+ }
+ return sb.toString();
+ }
+
+ public boolean isNumber(String strNum) {
+ if (strNum == null) {
+ return false;
+ }
+ return numberPattern.matcher(strNum).matches();
+ }
+
+ protected static String fixAliases(final String s) {
+ final StringBuilder sb = new StringBuilder();
+
+ s.chars().forEach(ch -> {
+ final int i = StringUtils.indexOf(aliases_from, ch);
+ sb.append(i >= 0 ? aliases_to.charAt(i) : (char) ch);
+ });
+
+ return sb.toString();
+ }
+
+ protected static String transliterate(final String s) {
+ try {
+ return transliterator.transliterate(s);
+ } catch (Exception e) {
+ return s;
+ }
+ }
+
+ protected String removeSymbols(final String s) {
+ final StringBuilder sb = new StringBuilder();
+
+ s.chars().forEach(ch -> {
+ sb.append(StringUtils.contains(alpha, ch) ? (char) ch : ' ');
+ });
+
+ return sb.toString().replaceAll("\\s+", " ");
+ }
+
+ protected boolean notNull(final String s) {
+ return s != null;
+ }
+
+ protected String normalize(final String s) {
+ return fixAliases(transliterate(nfd(unicodeNormalization(s))))
+ .toLowerCase()
+ // do not compact the regexes in a single expression, would cause StackOverflowError in case of large input
+ // strings
+ .replaceAll("[^ \\w]+", "")
+ .replaceAll("(\\p{InCombiningDiacriticalMarks})+", "")
+ .replaceAll("(\\p{Punct})+", " ")
+ .replaceAll("(\\d)+", " ")
+ .replaceAll("(\\n)+", " ")
+ .trim();
+ }
+
+ public String nfd(final String s) {
+ return Normalizer.normalize(s, Normalizer.Form.NFD);
+ }
+
+ public String utf8(final String s) {
+ byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
+ return new String(bytes, StandardCharsets.UTF_8);
+ }
+
+ public String unicodeNormalization(final String s) {
+
+ Matcher m = hexUnicodePattern.matcher(s);
+ StringBuffer buf = new StringBuffer(s.length());
+ while (m.find()) {
+ String ch = String.valueOf((char) Integer.parseInt(m.group(1), 16));
+ m.appendReplacement(buf, Matcher.quoteReplacement(ch));
+ }
+ m.appendTail(buf);
+ return buf.toString();
+ }
+
+ protected String filterStopWords(final String s, final Set stopwords) {
+ final StringTokenizer st = new StringTokenizer(s);
+ final StringBuilder sb = new StringBuilder();
+ while (st.hasMoreTokens()) {
+ final String token = st.nextToken();
+ if (!stopwords.contains(token)) {
+ sb.append(token);
+ sb.append(" ");
+ }
+ }
+ return sb.toString().trim();
+ }
+
+ public String filterAllStopWords(String s) {
+
+ s = filterStopWords(s, stopwords_en);
+ s = filterStopWords(s, stopwords_de);
+ s = filterStopWords(s, stopwords_it);
+ s = filterStopWords(s, stopwords_fr);
+ s = filterStopWords(s, stopwords_pt);
+ s = filterStopWords(s, stopwords_es);
+ s = filterStopWords(s, stopwords_gr);
+
+ return s;
+ }
+
+ protected Collection filterBlacklisted(final Collection set, final Set ngramBlacklist) {
+ final Set newset = Sets.newLinkedHashSet();
+ for (final String s : set) {
+ if (!ngramBlacklist.contains(s)) {
+ newset.add(s);
+ }
+ }
+ return newset;
+ }
+
+ public static Set loadFromClasspath(final String classpath) {
+
+ Transliterator transliterator = Transliterator.getInstance("Any-Eng");
+
+ final Set h = Sets.newHashSet();
+ try {
+ for (final String s : IOUtils
+ .readLines(NGramUtils.class.getResourceAsStream(classpath), StandardCharsets.UTF_8)) {
+ h.add(fixAliases(transliterator.transliterate(s))); // transliteration of the stopwords
+ }
+ } catch (final Throwable e) {
+ return Sets.newHashSet();
+ }
+ return h;
+ }
+
+ public static Map loadMapFromClasspath(final String classpath) {
+
+ Transliterator transliterator = Transliterator.getInstance("Any-Eng");
+
+ final Map m = new HashMap<>();
+ try {
+ for (final String s : IOUtils
+ .readLines(AbstractPaceFunctions.class.getResourceAsStream(classpath), StandardCharsets.UTF_8)) {
+ // string is like this: code;word1;word2;word3
+ String[] line = s.split(";");
+ String value = line[0];
+ for (int i = 1; i < line.length; i++) {
+ m.put(fixAliases(transliterator.transliterate(line[i].toLowerCase())), value);
+ }
+ }
+ } catch (final Throwable e) {
+ return new HashMap<>();
+ }
+ return m;
+ }
+
+ public String removeKeywords(String s, Set keywords) {
+
+ s = " " + s + " ";
+ for (String k : keywords) {
+ s = s.replaceAll(k.toLowerCase(), "");
+ }
+
+ return s.trim();
+ }
+
+ public double commonElementsPercentage(Set s1, Set s2) {
+
+ double longer = Math.max(s1.size(), s2.size());
+ return (double) s1.stream().filter(s2::contains).count() / longer;
+ }
+
+ // convert the set of keywords to codes
+ public Set toCodes(Set keywords, Map translationMap) {
+ return keywords.stream().map(s -> translationMap.get(s)).collect(Collectors.toSet());
+ }
+
+ public Set keywordsToCodes(Set keywords, Map translationMap) {
+ return toCodes(keywords, translationMap);
+ }
+
+ public Set citiesToCodes(Set keywords) {
+ return toCodes(keywords, cityMap);
+ }
+
+ protected String firstLC(final String s) {
+ return StringUtils.substring(s, 0, 1).toLowerCase();
+ }
+
+ protected Iterable tokens(final String s, final int maxTokens) {
+ return Iterables.limit(Splitter.on(" ").omitEmptyStrings().trimResults().split(s), maxTokens);
+ }
+
+ public String normalizePid(String pid) {
+ return DOI_PREFIX.matcher(pid.toLowerCase()).replaceAll("");
+ }
+
+ // get the list of keywords into the input string
+ public Set getKeywords(String s1, Map translationMap, int windowSize) {
+
+ String s = s1;
+
+ List tokens = Arrays.asList(s.toLowerCase().split(" "));
+
+ Set codes = new HashSet<>();
+
+ if (tokens.size() < windowSize)
+ windowSize = tokens.size();
+
+ int length = windowSize;
+
+ while (length != 0) {
+
+ for (int i = 0; i <= tokens.size() - length; i++) {
+ String candidate = concat(tokens.subList(i, i + length));
+ if (translationMap.containsKey(candidate)) {
+ codes.add(candidate);
+ s = s.replace(candidate, "").trim();
+ }
+ }
+
+ tokens = Arrays.asList(s.split(" "));
+ length -= 1;
+ }
+
+ return codes;
+ }
+
+ public Set getCities(String s1, int windowSize) {
+ return getKeywords(s1, cityMap, windowSize);
+ }
+
+ public static String readFromClasspath(final String filename, final Class clazz) {
+ final StringWriter sw = new StringWriter();
+ try {
+ IOUtils.copy(clazz.getResourceAsStream(filename), sw, StandardCharsets.UTF_8);
+ return sw.toString();
+ } catch (final IOException e) {
+ throw new RuntimeException("cannot load resource from classpath: " + filename);
+ }
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/Config.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/Config.java
new file mode 100644
index 000000000..4d823d129
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/Config.java
@@ -0,0 +1,53 @@
+
+package eu.dnetlib.pace.config;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import eu.dnetlib.pace.model.ClusteringDef;
+import eu.dnetlib.pace.model.FieldDef;
+import eu.dnetlib.pace.tree.support.TreeNodeDef;
+
+/**
+ * Interface for PACE configuration bean.
+ *
+ * @author claudio
+ */
+public interface Config {
+
+ /**
+ * Field configuration definitions.
+ *
+ * @return the list of definitions
+ */
+ public List model();
+
+ /**
+ * Decision Tree definition
+ *
+ * @return the map representing the decision tree
+ */
+ public Map decisionTree();
+
+ /**
+ * Clusterings.
+ *
+ * @return the list
+ */
+ public List clusterings();
+
+ /**
+ * Blacklists.
+ *
+ * @return the map
+ */
+ public Map> blacklists();
+
+ /**
+ * Translation map.
+ *
+ * @return the map
+ * */
+ public Map translationMap();
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/DedupConfig.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/DedupConfig.java
new file mode 100644
index 000000000..ac0ef08e4
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/DedupConfig.java
@@ -0,0 +1,178 @@
+
+package eu.dnetlib.pace.config;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import java.util.stream.Collectors;
+
+import org.antlr.stringtemplate.StringTemplate;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+
+import eu.dnetlib.pace.model.ClusteringDef;
+import eu.dnetlib.pace.model.FieldDef;
+import eu.dnetlib.pace.tree.support.TreeNodeDef;
+import eu.dnetlib.pace.util.PaceException;
+
+public class DedupConfig implements Config, Serializable {
+ private static String CONFIG_TEMPLATE = "dedupConfig.st";
+
+ private PaceConfig pace;
+
+ private WfConfig wf;
+
+ @JsonIgnore
+ private Map> blacklists;
+
+ private static Map defaults = Maps.newHashMap();
+
+ static {
+ defaults.put("dedupRun", "001");
+ defaults.put("entityType", "result");
+ defaults.put("subEntityType", "resulttype");
+ defaults.put("subEntityValue", "publication");
+ defaults.put("orderField", "title");
+ defaults.put("queueMaxSize", "2000");
+ defaults.put("groupMaxSize", "10");
+ defaults.put("slidingWindowSize", "200");
+ defaults.put("rootBuilder", "result");
+ defaults.put("includeChildren", "true");
+ defaults.put("maxIterations", "20");
+ defaults.put("idPath", "$.id");
+ }
+
+ public DedupConfig() {
+ }
+
+ public static DedupConfig load(final String json) {
+
+ final DedupConfig config;
+ try {
+ config = new ObjectMapper().readValue(json, DedupConfig.class);
+ config.getPace().initModel();
+ config.getPace().initTranslationMap();
+
+ config.blacklists = config
+ .getPace()
+ .getBlacklists()
+ .entrySet()
+ .stream()
+ .map(
+ e -> new AbstractMap.SimpleEntry>(e.getKey(),
+ e
+ .getValue()
+ .stream()
+ .filter(s -> !StringUtils.isBlank(s))
+ .map(Pattern::compile)
+ .collect(Collectors.toList())))
+ .collect(
+ Collectors
+ .toMap(
+ e -> e.getKey(),
+ e -> (Predicate & Serializable) s -> e
+ .getValue()
+ .stream()
+ .filter(p -> p.matcher(s).matches())
+ .findFirst()
+ .isPresent()))
+
+ ;
+
+ return config;
+ } catch (IOException | PatternSyntaxException e) {
+ throw new PaceException("Error in parsing configuration json", e);
+ }
+
+ }
+
+ public static DedupConfig loadDefault() throws IOException {
+ return loadDefault(new HashMap());
+ }
+
+ public static DedupConfig loadDefault(final Map params) throws IOException {
+
+ final StringTemplate template = new StringTemplate(new DedupConfig().readFromClasspath(CONFIG_TEMPLATE));
+
+ for (final Entry e : defaults.entrySet()) {
+ template.setAttribute(e.getKey(), e.getValue());
+ }
+ for (final Entry e : params.entrySet()) {
+ if (template.getAttribute(e.getKey()) != null) {
+ template.getAttributes().computeIfPresent(e.getKey(), (o, o2) -> e.getValue());
+ } else {
+ template.setAttribute(e.getKey(), e.getValue());
+ }
+ }
+
+ final String json = template.toString();
+ return load(json);
+ }
+
+ private String readFromClasspath(final String resource) throws IOException {
+ return IOUtils.toString(getClass().getResource(resource), StandardCharsets.UTF_8);
+ }
+
+ public PaceConfig getPace() {
+ return pace;
+ }
+
+ public void setPace(final PaceConfig pace) {
+ this.pace = pace;
+ }
+
+ public WfConfig getWf() {
+ return wf;
+ }
+
+ public void setWf(final WfConfig wf) {
+ this.wf = wf;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return new ObjectMapper().writeValueAsString(this);
+ } catch (IOException e) {
+ throw new PaceException("unable to serialise configuration", e);
+ }
+ }
+
+ @Override
+ public Map decisionTree() {
+ return getPace().getDecisionTree();
+ }
+
+ @Override
+ public List model() {
+ return getPace().getModel();
+ }
+
+ @Override
+ public List clusterings() {
+ return getPace().getClustering();
+ }
+
+ @Override
+ public Map> blacklists() {
+ return blacklists;
+ }
+
+ @Override
+ public Map translationMap() {
+ return getPace().translationMap();
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/PaceConfig.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/PaceConfig.java
new file mode 100644
index 000000000..f1bc49f4a
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/PaceConfig.java
@@ -0,0 +1,108 @@
+
+package eu.dnetlib.pace.config;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.Maps;
+import com.ibm.icu.text.Transliterator;
+
+import eu.dnetlib.pace.common.AbstractPaceFunctions;
+import eu.dnetlib.pace.model.ClusteringDef;
+import eu.dnetlib.pace.model.FieldDef;
+import eu.dnetlib.pace.tree.support.TreeNodeDef;
+import eu.dnetlib.pace.util.PaceResolver;
+
+public class PaceConfig extends AbstractPaceFunctions implements Serializable {
+
+ private List model;
+
+ private List clustering;
+ private Map decisionTree;
+
+ private Map> blacklists;
+ private Map> synonyms;
+
+ @JsonIgnore
+ private Map translationMap;
+
+ public Map getModelMap() {
+ return modelMap;
+ }
+
+ @JsonIgnore
+ private Map modelMap;
+
+ @JsonIgnore
+ public static PaceResolver resolver = new PaceResolver();
+
+ public PaceConfig() {
+ }
+
+ public void initModel() {
+ modelMap = Maps.newHashMap();
+ for (FieldDef fd : getModel()) {
+ modelMap.put(fd.getName(), fd);
+ }
+ }
+
+ public void initTranslationMap() {
+ translationMap = Maps.newHashMap();
+
+ Transliterator transliterator = Transliterator.getInstance("Any-Eng");
+ for (String key : synonyms.keySet()) {
+ for (String term : synonyms.get(key)) {
+ translationMap
+ .put(
+ fixAliases(transliterator.transliterate(term.toLowerCase())),
+ key);
+ }
+ }
+ }
+
+ public Map translationMap() {
+ return translationMap;
+ }
+
+ public List getModel() {
+ return model;
+ }
+
+ public void setModel(final List model) {
+ this.model = model;
+ }
+
+ public List getClustering() {
+ return clustering;
+ }
+
+ public void setClustering(final List clustering) {
+ this.clustering = clustering;
+ }
+
+ public Map getDecisionTree() {
+ return decisionTree;
+ }
+
+ public void setDecisionTree(Map decisionTree) {
+ this.decisionTree = decisionTree;
+ }
+
+ public Map> getBlacklists() {
+ return blacklists;
+ }
+
+ public void setBlacklists(final Map> blacklists) {
+ this.blacklists = blacklists;
+ }
+
+ public Map> getSynonyms() {
+ return synonyms;
+ }
+
+ public void setSynonyms(Map> synonyms) {
+ this.synonyms = synonyms;
+ }
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/Type.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/Type.java
new file mode 100644
index 000000000..9f3323edc
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/Type.java
@@ -0,0 +1,6 @@
+
+package eu.dnetlib.pace.config;
+
+public enum Type {
+ String, Int, List, JSON, URL, StringConcat, DoubleArray
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/WfConfig.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/WfConfig.java
new file mode 100644
index 000000000..8dea04232
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/config/WfConfig.java
@@ -0,0 +1,294 @@
+
+package eu.dnetlib.pace.config;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import eu.dnetlib.pace.util.PaceException;
+
+public class WfConfig implements Serializable {
+
+ /**
+ * Entity type.
+ */
+ private String entityType = "";
+
+ /**
+ * Sub-Entity type refers to one of fields declared in the model. See eu.dnetlib.pace.config.PaceConfig.modelMap
+ */
+ private String subEntityType = "";
+
+ /**
+ * Sub-Entity value declares a value for subTypes to be considered.
+ */
+ private String subEntityValue = "";
+
+ /**
+ * Field name used to sort the values in the reducer phase.
+ */
+ private String orderField = "";
+
+ /**
+ * Column Families involved in the relations redirection.
+ */
+ private List rootBuilder = Lists.newArrayList();
+
+ /**
+ * Set of datasource namespace prefixes that won't be deduplicated.
+ */
+ private Set skipList = Sets.newHashSet();
+
+ /**
+ * Subprefix used to build the root id, allows multiple dedup runs.
+ */
+ private String dedupRun = "";
+
+ /**
+ * Similarity threshold.
+ */
+ private double threshold = 0;
+
+ /** The queue max size. */
+ private int queueMaxSize = 2000;
+
+ /** The group max size. */
+ private int groupMaxSize;
+
+ /** The sliding window size. */
+ private int slidingWindowSize;
+
+ /** The configuration id. */
+ private String configurationId;
+
+ /** The include children. */
+ private boolean includeChildren;
+
+ /** Default maximum number of allowed children. */
+ private final static int MAX_CHILDREN = 10;
+
+ /** Maximum number of allowed children. */
+ private int maxChildren = MAX_CHILDREN;
+
+ /** Default maximum number of iterations. */
+ private final static int MAX_ITERATIONS = 20;
+
+ /** Maximum number of iterations */
+ private int maxIterations = MAX_ITERATIONS;
+
+ /** The Jquery path to retrieve the identifier */
+ private String idPath = "$.id";
+
+ public WfConfig() {
+ }
+
+ /**
+ * Instantiates a new dedup config.
+ *
+ * @param entityType
+ * the entity type
+ * @param orderField
+ * the order field
+ * @param rootBuilder
+ * the root builder families
+ * @param dedupRun
+ * the dedup run
+ * @param skipList
+ * the skip list
+ * @param queueMaxSize
+ * the queue max size
+ * @param groupMaxSize
+ * the group max size
+ * @param slidingWindowSize
+ * the sliding window size
+ * @param includeChildren
+ * allows the children to be included in the representative records or not.
+ * @param maxIterations
+ * the maximum number of iterations
+ * @param idPath
+ * the path for the id of the entity
+ */
+ public WfConfig(final String entityType, final String orderField, final List rootBuilder,
+ final String dedupRun,
+ final Set skipList, final int queueMaxSize, final int groupMaxSize, final int slidingWindowSize,
+ final boolean includeChildren, final int maxIterations, final String idPath) {
+ super();
+ this.entityType = entityType;
+ this.orderField = orderField;
+ this.rootBuilder = rootBuilder;
+ this.dedupRun = cleanupStringNumber(dedupRun);
+ this.skipList = skipList;
+ this.queueMaxSize = queueMaxSize;
+ this.groupMaxSize = groupMaxSize;
+ this.slidingWindowSize = slidingWindowSize;
+ this.includeChildren = includeChildren;
+ this.maxIterations = maxIterations;
+ this.idPath = idPath;
+ }
+
+ /**
+ * Cleanup string number.
+ *
+ * @param s
+ * the s
+ * @return the string
+ */
+ private String cleanupStringNumber(final String s) {
+ return s.contains("'") ? s.replaceAll("'", "") : s;
+ }
+
+ public boolean hasSubType() {
+ return StringUtils.isNotBlank(getSubEntityType()) && StringUtils.isNotBlank(getSubEntityValue());
+ }
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+ public void setEntityType(final String entityType) {
+ this.entityType = entityType;
+ }
+
+ public String getSubEntityType() {
+ return subEntityType;
+ }
+
+ public void setSubEntityType(final String subEntityType) {
+ this.subEntityType = subEntityType;
+ }
+
+ public String getSubEntityValue() {
+ return subEntityValue;
+ }
+
+ public void setSubEntityValue(final String subEntityValue) {
+ this.subEntityValue = subEntityValue;
+ }
+
+ public String getOrderField() {
+ return orderField;
+ }
+
+ public void setOrderField(final String orderField) {
+ this.orderField = orderField;
+ }
+
+ public List getRootBuilder() {
+ return rootBuilder;
+ }
+
+ public void setRootBuilder(final List rootBuilder) {
+ this.rootBuilder = rootBuilder;
+ }
+
+ public Set getSkipList() {
+ return skipList != null ? skipList : new HashSet();
+ }
+
+ public void setSkipList(final Set skipList) {
+ this.skipList = skipList;
+ }
+
+ public String getDedupRun() {
+ return dedupRun;
+ }
+
+ public void setDedupRun(final String dedupRun) {
+ this.dedupRun = dedupRun;
+ }
+
+ public double getThreshold() {
+ return threshold;
+ }
+
+ public void setThreshold(final double threshold) {
+ this.threshold = threshold;
+ }
+
+ public int getQueueMaxSize() {
+ return queueMaxSize;
+ }
+
+ public void setQueueMaxSize(final int queueMaxSize) {
+ this.queueMaxSize = queueMaxSize;
+ }
+
+ public int getGroupMaxSize() {
+ return groupMaxSize;
+ }
+
+ public void setGroupMaxSize(final int groupMaxSize) {
+ this.groupMaxSize = groupMaxSize;
+ }
+
+ public int getSlidingWindowSize() {
+ return slidingWindowSize;
+ }
+
+ public void setSlidingWindowSize(final int slidingWindowSize) {
+ this.slidingWindowSize = slidingWindowSize;
+ }
+
+ public String getConfigurationId() {
+ return configurationId;
+ }
+
+ public void setConfigurationId(final String configurationId) {
+ this.configurationId = configurationId;
+ }
+
+ public boolean isIncludeChildren() {
+ return includeChildren;
+ }
+
+ public void setIncludeChildren(final boolean includeChildren) {
+ this.includeChildren = includeChildren;
+ }
+
+ public int getMaxChildren() {
+ return maxChildren;
+ }
+
+ public void setMaxChildren(final int maxChildren) {
+ this.maxChildren = maxChildren;
+ }
+
+ public int getMaxIterations() {
+ return maxIterations;
+ }
+
+ public void setMaxIterations(int maxIterations) {
+ this.maxIterations = maxIterations;
+ }
+
+ public String getIdPath() {
+ return idPath;
+ }
+
+ public void setIdPath(String idPath) {
+ this.idPath = idPath;
+
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ try {
+ return new ObjectMapper().writeValueAsString(this);
+ } catch (IOException e) {
+ throw new PaceException("unable to serialise " + this.getClass().getName(), e);
+ }
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/ClusteringDef.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/ClusteringDef.java
new file mode 100644
index 000000000..d9ad81d42
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/ClusteringDef.java
@@ -0,0 +1,63 @@
+
+package eu.dnetlib.pace.model;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.pace.clustering.ClusteringFunction;
+import eu.dnetlib.pace.config.PaceConfig;
+import eu.dnetlib.pace.util.PaceException;
+
+public class ClusteringDef implements Serializable {
+
+ private String name;
+
+ private List fields;
+
+ private Map params;
+
+ public ClusteringDef() {
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ public ClusteringFunction clusteringFunction() {
+ return PaceConfig.resolver.getClusteringFunction(getName(), params);
+ }
+
+ public List getFields() {
+ return fields;
+ }
+
+ public void setFields(final List fields) {
+ this.fields = fields;
+ }
+
+ public Map getParams() {
+ return params;
+ }
+
+ public void setParams(final Map params) {
+ this.params = params;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return new ObjectMapper().writeValueAsString(this);
+ } catch (IOException e) {
+ throw new PaceException("unable to serialise " + this.getClass().getName(), e);
+ }
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java
new file mode 100644
index 000000000..f34545e6d
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java
@@ -0,0 +1,103 @@
+
+package eu.dnetlib.pace.model;
+
+import java.io.Serializable;
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+import eu.dnetlib.pace.config.Type;
+
+/**
+ * The schema is composed by field definitions (FieldDef). Each field has a type, a name, and an associated compare algorithm.
+ */
+public class FieldDef implements Serializable {
+
+ public final static String PATH_SEPARATOR = "/";
+
+ private String name;
+
+ private String path;
+
+ private Type type;
+
+ private boolean overrideMatch;
+
+ /**
+ * Sets maximum size for the repeatable fields in the model. -1 for unbounded size.
+ */
+ private int size = -1;
+
+ /**
+ * Sets maximum length for field values in the model. -1 for unbounded length.
+ */
+ private int length = -1;
+
+ public FieldDef() {
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public List getPathList() {
+ return Lists.newArrayList(Splitter.on(PATH_SEPARATOR).split(getPath()));
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public void setType(final Type type) {
+ this.type = type;
+ }
+
+ public boolean isOverrideMatch() {
+ return overrideMatch;
+ }
+
+ public void setOverrideMatch(final boolean overrideMatch) {
+ this.overrideMatch = overrideMatch;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public void setLength(int length) {
+ this.length = length;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return new ObjectMapper().writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ return null;
+ }
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/Person.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/Person.java
new file mode 100644
index 000000000..96120cf4d
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/Person.java
@@ -0,0 +1,156 @@
+
+package eu.dnetlib.pace.model;
+
+import java.nio.charset.Charset;
+import java.text.Normalizer;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+
+import eu.dnetlib.pace.common.AbstractPaceFunctions;
+import eu.dnetlib.pace.util.Capitalise;
+import eu.dnetlib.pace.util.DotAbbreviations;
+
+public class Person {
+
+ private static final String UTF8 = "UTF-8";
+ private List name = Lists.newArrayList();
+ private List surname = Lists.newArrayList();
+ private List fullname = Lists.newArrayList();
+ private final String original;
+
+ private static Set particles = null;
+
+ public Person(String s, final boolean aggressive) {
+ original = s;
+ s = Normalizer.normalize(s, Normalizer.Form.NFD);
+ s = s.replaceAll("\\(.+\\)", "");
+ s = s.replaceAll("\\[.+\\]", "");
+ s = s.replaceAll("\\{.+\\}", "");
+ s = s.replaceAll("\\s+-\\s+", "-");
+ s = s.replaceAll("[\\p{Punct}&&[^,-]]", " ");
+ s = s.replaceAll("\\d", " ");
+ s = s.replaceAll("\\n", " ");
+ s = s.replaceAll("\\.", " ");
+ s = s.replaceAll("\\s+", " ");
+
+ if (aggressive) {
+ s = s.replaceAll("[\\p{InCombiningDiacriticalMarks}&&[^,-]]", "");
+ // s = s.replaceAll("[\\W&&[^,-]]", "");
+ }
+
+ if (s.contains(",")) { // if the name contains a comma it is easy derivable the name and the surname
+ final String[] arr = s.split(",");
+ if (arr.length == 1) {
+ fullname = splitTerms(arr[0]);
+ } else if (arr.length > 1) {
+ surname = splitTerms(arr[0]);
+ name = splitTerms(arr[1]);
+ fullname.addAll(surname);
+ fullname.addAll(name);
+ }
+ } else {
+ fullname = splitTerms(s);
+
+ int lastInitialPosition = fullname.size();
+ boolean hasSurnameInUpperCase = false;
+
+ for (int i = 0; i < fullname.size(); i++) {
+ final String term = fullname.get(i);
+ if (term.length() == 1) {
+ lastInitialPosition = i;
+ } else if (term.equals(term.toUpperCase())) {
+ hasSurnameInUpperCase = true;
+ }
+ }
+
+ if (lastInitialPosition < (fullname.size() - 1)) { // Case: Michele G. Artini
+ name = fullname.subList(0, lastInitialPosition + 1);
+ surname = fullname.subList(lastInitialPosition + 1, fullname.size());
+ } else if (hasSurnameInUpperCase) { // Case: Michele ARTINI
+ for (final String term : fullname) {
+ if ((term.length() > 1) && term.equals(term.toUpperCase())) {
+ surname.add(term);
+ } else {
+ name.add(term);
+ }
+ }
+ }
+ }
+ }
+
+ private List splitTerms(final String s) {
+ if (particles == null) {
+ particles = AbstractPaceFunctions.loadFromClasspath("/eu/dnetlib/pace/config/name_particles.txt");
+ }
+
+ final List list = Lists.newArrayList();
+ for (final String part : Splitter.on(" ").omitEmptyStrings().split(s)) {
+ if (!particles.contains(part.toLowerCase())) {
+ list.add(part);
+ }
+ }
+ return list;
+ }
+
+ public List getName() {
+ return name;
+ }
+
+ public String getNameString() {
+ return Joiner.on(" ").join(getName());
+ }
+
+ public List getSurname() {
+ return surname;
+ }
+
+ public List getFullname() {
+ return fullname;
+ }
+
+ public String getOriginal() {
+ return original;
+ }
+
+ public String hash() {
+ return Hashing.murmur3_128().hashString(getNormalisedFullname(), Charset.forName(UTF8)).toString();
+ }
+
+ public String getNormalisedFirstName() {
+ return Joiner.on(" ").join(getCapitalFirstnames());
+ }
+
+ public String getNormalisedSurname() {
+ return Joiner.on(" ").join(getCapitalSurname());
+ }
+
+ public String getSurnameString() {
+ return Joiner.on(" ").join(getSurname());
+ }
+
+ public String getNormalisedFullname() {
+ return isAccurate() ? getNormalisedSurname() + ", " + getNormalisedFirstName() : Joiner.on(" ").join(fullname);
+ }
+
+ public List getCapitalFirstnames() {
+ return Lists.newArrayList(Iterables.transform(getNameWithAbbreviations(), new Capitalise()));
+ }
+
+ public List getCapitalSurname() {
+ return Lists.newArrayList(Iterables.transform(surname, new Capitalise()));
+ }
+
+ public List getNameWithAbbreviations() {
+ return Lists.newArrayList(Iterables.transform(name, new DotAbbreviations()));
+ }
+
+ public boolean isAccurate() {
+ return ((name != null) && (surname != null) && !name.isEmpty() && !surname.isEmpty());
+ }
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/PersonComparatorUtils.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/PersonComparatorUtils.java
new file mode 100644
index 000000000..1f8aab4bf
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/PersonComparatorUtils.java
@@ -0,0 +1,119 @@
+
+package eu.dnetlib.pace.model;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class PersonComparatorUtils {
+
+ private static final int MAX_FULLNAME_LENGTH = 50;
+
+ public static Set getNgramsForPerson(String fullname) {
+
+ Set set = Sets.newHashSet();
+
+ if (fullname.length() > MAX_FULLNAME_LENGTH) {
+ return set;
+ }
+
+ Person p = new Person(fullname, true);
+
+ if (p.isAccurate()) {
+ for (String name : p.getName()) {
+ for (String surname : p.getSurname()) {
+ set.add((name.charAt(0) + "_" + surname).toLowerCase());
+ }
+ }
+ } else {
+ List list = p.getFullname();
+ for (int i = 0; i < list.size(); i++) {
+ if (list.get(i).length() > 1) {
+ for (int j = 0; j < list.size(); j++) {
+ if (i != j) {
+ set.add((list.get(j).charAt(0) + "_" + list.get(i)).toLowerCase());
+ }
+ }
+ }
+ }
+ }
+
+ return set;
+ }
+
+ public static boolean areSimilar(String s1, String s2) {
+ Person p1 = new Person(s1, true);
+ Person p2 = new Person(s2, true);
+
+ if (p1.isAccurate() && p2.isAccurate()) {
+ return verifyNames(p1.getName(), p2.getName()) && verifySurnames(p1.getSurname(), p2.getSurname());
+ } else {
+ return verifyFullnames(p1.getFullname(), p2.getFullname());
+ }
+ }
+
+ private static boolean verifyNames(List list1, List list2) {
+ return verifySimilarity(extractExtendedNames(list1), extractExtendedNames(list2))
+ && verifySimilarity(extractInitials(list1), extractInitials(list2));
+ }
+
+ private static boolean verifySurnames(List list1, List list2) {
+ if (list1.size() != list2.size()) {
+ return false;
+ }
+ for (int i = 0; i < list1.size(); i++) {
+ if (!list1.get(i).equalsIgnoreCase(list2.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean verifyFullnames(List list1, List list2) {
+ Collections.sort(list1);
+ Collections.sort(list2);
+ return verifySimilarity(extractExtendedNames(list1), extractExtendedNames(list2))
+ && verifySimilarity(extractInitials(list1), extractInitials(list2));
+ }
+
+ private static List extractExtendedNames(List list) {
+ ArrayList res = Lists.newArrayList();
+ for (String s : list) {
+ if (s.length() > 1) {
+ res.add(s.toLowerCase());
+ }
+ }
+ return res;
+ }
+
+ private static List extractInitials(List list) {
+ ArrayList res = Lists.newArrayList();
+ for (String s : list) {
+ res.add(s.substring(0, 1).toLowerCase());
+ }
+ return res;
+ }
+
+ private static boolean verifySimilarity(List list1, List list2) {
+ if (list1.size() > list2.size()) {
+ return verifySimilarity(list2, list1);
+ }
+
+ // NB: List2 is greater than list1 (or equal)
+ int pos = -1;
+ for (String s : list1) {
+ int curr = list2.indexOf(s);
+ if (curr > pos) {
+ list2.set(curr, "*"); // I invalidate the found element, example: "amm - amm"
+ pos = curr;
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/RowDataOrderingComparator.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/RowDataOrderingComparator.java
new file mode 100644
index 000000000..f0ded0570
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/RowDataOrderingComparator.java
@@ -0,0 +1,65 @@
+
+package eu.dnetlib.pace.model;
+
+import java.util.Comparator;
+
+import org.apache.spark.sql.Row;
+
+import eu.dnetlib.pace.clustering.NGramUtils;
+
+/**
+ * The Class MapDocumentComparator.
+ */
+public class RowDataOrderingComparator implements Comparator {
+
+ /** The comparator field. */
+ private final int comparatorField;
+ private final int identityFieldPosition;
+
+ /**
+ * Instantiates a new map document comparator.
+ *
+ * @param comparatorField
+ * the comparator field
+ */
+ public RowDataOrderingComparator(final int comparatorField, int identityFieldPosition) {
+ this.comparatorField = comparatorField;
+ this.identityFieldPosition = identityFieldPosition;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+ */
+ @Override
+ public int compare(final Row d1, final Row d2) {
+ if (d1 == null)
+ return d2 == null ? 0 : -1;
+ else if (d2 == null) {
+ return 1;
+ }
+
+ final String o1 = d1.getString(comparatorField);
+ final String o2 = d2.getString(comparatorField);
+
+ if (o1 == null)
+ return o2 == null ? 0 : -1;
+ else if (o2 == null) {
+ return 1;
+ }
+
+ final String to1 = NGramUtils.cleanupForOrdering(o1);
+ final String to2 = NGramUtils.cleanupForOrdering(o2);
+
+ int res = to1.compareTo(to2);
+ if (res == 0) {
+ res = o1.compareTo(o2);
+ if (res == 0) {
+ return d1.getString(identityFieldPosition).compareTo(d2.getString(identityFieldPosition));
+ }
+ }
+
+ return res;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala
new file mode 100644
index 000000000..b3f56bcdb
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala
@@ -0,0 +1,131 @@
+package eu.dnetlib.pace.model
+
+import eu.dnetlib.pace.config.{DedupConfig, Type}
+import eu.dnetlib.pace.util.{BlockProcessor, SparkReporter}
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.expressions._
+import org.apache.spark.sql.functions.{col, lit, udf}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Column, Dataset, Row, functions}
+
+import java.util.function.Predicate
+import java.util.stream.Collectors
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+case class SparkDeduper(conf: DedupConfig) extends Serializable {
+
+ val model: SparkModel = SparkModel(conf)
+
+ val dedup: (Dataset[Row] => Dataset[Row]) = df => {
+ df.transform(filterAndCleanup)
+ .transform(generateClustersWithCollect)
+ .transform(processBlocks)
+ }
+
+
+ val filterAndCleanup: (Dataset[Row] => Dataset[Row]) = df => {
+ val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => {
+ if (conf.blacklists.containsKey(fdef.getName)) {
+ res.withColumn(
+ fdef.getName + "_filtered",
+ filterColumnUDF(fdef).apply(new Column(fdef.getName))
+ )
+ } else {
+ res
+ }
+ })
+
+ df_with_filters
+ }
+
+ def filterColumnUDF(fdef: FieldDef): UserDefinedFunction = {
+ val blacklist: Predicate[String] = conf.blacklists().get(fdef.getName)
+
+ if (blacklist == null) {
+ throw new IllegalArgumentException("Column: " + fdef.getName + " does not have any filter")
+ } else {
+ fdef.getType match {
+ case Type.List | Type.JSON =>
+ udf[Array[String], Array[String]](values => {
+ values.filter((v: String) => !blacklist.test(v))
+ })
+
+ case _ =>
+ udf[String, String](v => {
+ if (blacklist.test(v)) ""
+ else v
+ })
+ }
+ }
+ }
+
+ val generateClustersWithCollect: (Dataset[Row] => Dataset[Row]) = df_with_filters => {
+ var df_with_clustering_keys: Dataset[Row] = null
+
+ for ((cd, idx) <- conf.clusterings().zipWithIndex) {
+ val inputColumns = cd.getFields().foldLeft(Seq[Column]())((acc, fName) => {
+ val column = if (conf.blacklists.containsKey(fName))
+ Seq(col(fName + "_filtered"))
+ else
+ Seq(col(fName))
+
+ acc ++ column
+ })
+
+ // Add 'key' column with the value generated by the given clustering definition
+ val ds: Dataset[Row] = df_with_filters
+ .withColumn("clustering", lit(cd.getName + "::" + idx))
+ .withColumn("key", functions.explode(clusterValuesUDF(cd).apply(functions.array(inputColumns: _*))))
+ // Add position column having the position of the row within the set of rows having the same key value ordered by the sorting value
+ .withColumn("position", functions.row_number().over(Window.partitionBy("key").orderBy(col(model.orderingFieldName), col(model.identifierFieldName))))
+
+ if (df_with_clustering_keys == null)
+ df_with_clustering_keys = ds
+ else
+ df_with_clustering_keys = df_with_clustering_keys.union(ds)
+ }
+
+ //TODO: analytics
+
+ val df_with_blocks = df_with_clustering_keys
+ // filter out rows with position exceeding the maxqueuesize parameter
+ .filter(col("position").leq(conf.getWf.getQueueMaxSize))
+ .groupBy("clustering", "key")
+ .agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
+ .filter(functions.size(new Column("block")).gt(1))
+
+ df_with_blocks
+ }
+
+ def clusterValuesUDF(cd: ClusteringDef) = {
+ udf[mutable.WrappedArray[String], mutable.WrappedArray[Any]](values => {
+ values.flatMap(f => cd.clusteringFunction().apply(conf, Seq(f.toString).asJava).asScala)
+ })
+ }
+
+ val processBlocks: (Dataset[Row] => Dataset[Row]) = df => {
+ df.filter(functions.size(new Column("block")).geq(new Literal(2, DataTypes.IntegerType)))
+ .withColumn("relations", processBlock(df.sqlContext.sparkContext).apply(new Column("block")))
+ .select(functions.explode(new Column("relations")).as("relation"))
+ }
+
+ def processBlock(implicit sc: SparkContext) = {
+ val accumulators = SparkReporter.constructAccumulator(conf, sc)
+
+ udf[Array[(String, String)], mutable.WrappedArray[Row]](block => {
+ val reporter = new SparkReporter(accumulators)
+
+ val mapDocuments = block.asJava.stream()
+ .sorted(new RowDataOrderingComparator(model.orderingFieldPosition, model.identityFieldPosition))
+ .limit(conf.getWf.getQueueMaxSize)
+ .collect(Collectors.toList[Row]())
+
+ new BlockProcessor(conf, model.identityFieldPosition, model.orderingFieldPosition).processSortedRows(mapDocuments, reporter)
+
+ reporter.getRelations.asScala.toArray
+ }).asNondeterministic()
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala
new file mode 100644
index 000000000..aa997c6e9
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala
@@ -0,0 +1,108 @@
+package eu.dnetlib.pace.model
+
+import com.jayway.jsonpath.{Configuration, JsonPath}
+import eu.dnetlib.pace.config.{DedupConfig, Type}
+import eu.dnetlib.pace.util.MapDocumentUtil
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
+import org.apache.spark.sql.{Dataset, Row}
+
+import java.util.regex.Pattern
+import scala.collection.JavaConverters._
+
+case class SparkModel(conf: DedupConfig) {
+ private val URL_REGEX: Pattern = Pattern.compile("^\\s*(http|https|ftp)\\://.*")
+
+ private val CONCAT_REGEX: Pattern = Pattern.compile("\\|\\|\\|")
+
+ val identifierFieldName = "identifier"
+
+ val orderingFieldName = if (!conf.getWf.getOrderField.isEmpty) conf.getWf.getOrderField else identifierFieldName
+
+ val schema: StructType = {
+ // create an implicit identifier field
+ val identifier = new FieldDef()
+ identifier.setName(identifierFieldName)
+ identifier.setType(Type.String)
+
+ // Construct a Spark StructType representing the schema of the model
+ (Seq(identifier) ++ conf.getPace.getModel.asScala)
+ .foldLeft(
+ new StructType()
+ )((resType, fieldDef) => {
+ resType.add(fieldDef.getType match {
+ case Type.List | Type.JSON =>
+ StructField(fieldDef.getName, DataTypes.createArrayType(DataTypes.StringType), true, Metadata.empty)
+ case Type.DoubleArray =>
+ StructField(fieldDef.getName, DataTypes.createArrayType(DataTypes.DoubleType), true, Metadata.empty)
+ case _ =>
+ StructField(fieldDef.getName, DataTypes.StringType, true, Metadata.empty)
+ })
+ })
+
+
+ }
+
+ val identityFieldPosition: Int = schema.fieldIndex(identifierFieldName)
+
+ val orderingFieldPosition: Int = schema.fieldIndex(orderingFieldName)
+
+ val parseJsonDataset: (Dataset[String] => Dataset[Row]) = df => {
+ df.map(r => rowFromJson(r))(RowEncoder(schema))
+ }
+
+ def rowFromJson(json: String): Row = {
+ val documentContext =
+ JsonPath.using(Configuration.defaultConfiguration.addOptions(com.jayway.jsonpath.Option.SUPPRESS_EXCEPTIONS)).parse(json)
+ val values = new Array[Any](schema.size)
+
+ values(identityFieldPosition) = MapDocumentUtil.getJPathString(conf.getWf.getIdPath, documentContext)
+
+ schema.fieldNames.zipWithIndex.foldLeft(values) {
+ case ((res, (fname, index))) => {
+ val fdef = conf.getPace.getModelMap.get(fname)
+
+ if (fdef != null) {
+ res(index) = fdef.getType match {
+ case Type.String | Type.Int =>
+ MapDocumentUtil.truncateValue(
+ MapDocumentUtil.getJPathString(fdef.getPath, documentContext),
+ fdef.getLength
+ )
+
+ case Type.URL =>
+ var uv = MapDocumentUtil.getJPathString(fdef.getPath, documentContext)
+ if (!URL_REGEX.matcher(uv).matches)
+ uv = ""
+ uv
+
+ case Type.List | Type.JSON =>
+ MapDocumentUtil.truncateList(
+ MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType),
+ fdef.getSize
+ ).asScala
+
+ case Type.StringConcat =>
+ val jpaths = CONCAT_REGEX.split(fdef.getPath)
+
+ MapDocumentUtil.truncateValue(
+ jpaths
+ .map(jpath => MapDocumentUtil.getJPathString(jpath, documentContext))
+ .mkString(" "),
+ fdef.getLength
+ )
+
+ case Type.DoubleArray =>
+ MapDocumentUtil.getJPathArray(fdef.getPath, json)
+ }
+ }
+
+ res
+ }
+ }
+
+ new GenericRowWithSchema(values, schema)
+ }
+}
+
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/AlwaysMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/AlwaysMatch.java
new file mode 100644
index 000000000..4d31df5b3
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/AlwaysMatch.java
@@ -0,0 +1,42 @@
+
+package eu.dnetlib.pace.tree;
+
+import java.util.Map;
+
+import com.wcohen.ss.AbstractStringDistance;
+
+import eu.dnetlib.pace.config.Config;
+import eu.dnetlib.pace.tree.support.AbstractComparator;
+import eu.dnetlib.pace.tree.support.ComparatorClass;
+
+@ComparatorClass("alwaysMatch")
+public class AlwaysMatch extends AbstractComparator {
+
+ public AlwaysMatch(final Map params) {
+ super(params, new com.wcohen.ss.JaroWinkler());
+ }
+
+ public AlwaysMatch(final double weight) {
+ super(weight, new com.wcohen.ss.JaroWinkler());
+ }
+
+ protected AlwaysMatch(final double weight, final AbstractStringDistance ssalgo) {
+ super(weight, ssalgo);
+ }
+
+ @Override
+ public double compare(final Object a, final Object b, final Config conf) {
+ return 1.0;
+ }
+
+ @Override
+ public double getWeight() {
+ return super.weight;
+ }
+
+ @Override
+ protected double normalize(final double d) {
+ return d;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/AuthorsMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/AuthorsMatch.java
new file mode 100644
index 000000000..5c6939e60
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/AuthorsMatch.java
@@ -0,0 +1,157 @@
+
+package eu.dnetlib.pace.tree;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.wcohen.ss.AbstractStringDistance;
+
+import eu.dnetlib.pace.config.Config;
+import eu.dnetlib.pace.model.Person;
+import eu.dnetlib.pace.tree.support.AbstractListComparator;
+import eu.dnetlib.pace.tree.support.ComparatorClass;
+
+@ComparatorClass("authorsMatch")
+public class AuthorsMatch extends AbstractListComparator {
+
+ Map params;
+
+ private double SURNAME_THRESHOLD;
+ private double NAME_THRESHOLD;
+ private double FULLNAME_THRESHOLD;
+ private String MODE; // full or surname
+ private int SIZE_THRESHOLD;
+ private String TYPE; // count or percentage
+ private int common;
+
+ public AuthorsMatch(Map params) {
+ super(params, new com.wcohen.ss.JaroWinkler());
+ this.params = params;
+
+ MODE = params.getOrDefault("mode", "full");
+ SURNAME_THRESHOLD = Double.parseDouble(params.getOrDefault("surname_th", "0.95"));
+ NAME_THRESHOLD = Double.parseDouble(params.getOrDefault("name_th", "0.95"));
+ FULLNAME_THRESHOLD = Double.parseDouble(params.getOrDefault("fullname_th", "0.9"));
+ SIZE_THRESHOLD = Integer.parseInt(params.getOrDefault("size_th", "20"));
+ TYPE = params.getOrDefault("type", "percentage");
+ common = 0;
+ }
+
+ protected AuthorsMatch(double w, AbstractStringDistance ssalgo) {
+ super(w, ssalgo);
+ }
+
+ @Override
+ public double compare(final List a, final List b, final Config conf) {
+
+ if (a.isEmpty() || b.isEmpty())
+ return -1;
+
+ if (a.size() > SIZE_THRESHOLD || b.size() > SIZE_THRESHOLD)
+ return 1.0;
+
+ List aList = a.stream().map(author -> new Person(author, false)).collect(Collectors.toList());
+ List bList = b.stream().map(author -> new Person(author, false)).collect(Collectors.toList());
+
+ common = 0;
+ // compare each element of List1 with each element of List2
+ for (Person p1 : aList)
+
+ for (Person p2 : bList) {
+
+ // both persons are inaccurate
+ if (!p1.isAccurate() && !p2.isAccurate()) {
+ // compare just normalized fullnames
+ String fullname1 = normalization(
+ p1.getNormalisedFullname().isEmpty() ? p1.getOriginal() : p1.getNormalisedFullname());
+ String fullname2 = normalization(
+ p2.getNormalisedFullname().isEmpty() ? p2.getOriginal() : p2.getNormalisedFullname());
+
+ if (ssalgo.score(fullname1, fullname2) > FULLNAME_THRESHOLD) {
+ common += 1;
+ break;
+ }
+ }
+
+ // one person is inaccurate
+ if (p1.isAccurate() ^ p2.isAccurate()) {
+ // prepare data
+ // data for the accurate person
+ String name = normalization(
+ p1.isAccurate() ? p1.getNormalisedFirstName() : p2.getNormalisedFirstName());
+ String surname = normalization(
+ p1.isAccurate() ? p1.getNormalisedSurname() : p2.getNormalisedSurname());
+
+ // data for the inaccurate person
+ String fullname = normalization(
+ p1.isAccurate()
+ ? ((p2.getNormalisedFullname().isEmpty()) ? p2.getOriginal() : p2.getNormalisedFullname())
+ : (p1.getNormalisedFullname().isEmpty() ? p1.getOriginal() : p1.getNormalisedFullname()));
+
+ if (fullname.contains(surname)) {
+ if (MODE.equals("full")) {
+ if (fullname.contains(name)) {
+ common += 1;
+ break;
+ }
+ } else { // MODE equals "surname"
+ common += 1;
+ break;
+ }
+ }
+ }
+
+ // both persons are accurate
+ if (p1.isAccurate() && p2.isAccurate()) {
+
+ if (compareSurname(p1, p2)) {
+ if (MODE.equals("full")) {
+ if (compareFirstname(p1, p2)) {
+ common += 1;
+ break;
+ }
+ } else { // MODE equals "surname"
+ common += 1;
+ break;
+ }
+ }
+
+ }
+
+ }
+
+ // normalization factor to compute the score
+ int normFactor = aList.size() == bList.size() ? aList.size() : (aList.size() + bList.size() - common);
+
+ if (TYPE.equals("percentage")) {
+ return (double) common / normFactor;
+ } else {
+ return (double) common;
+ }
+ }
+
+ public boolean compareSurname(Person p1, Person p2) {
+ return ssalgo
+ .score(
+ normalization(p1.getNormalisedSurname()), normalization(p2.getNormalisedSurname())) > SURNAME_THRESHOLD;
+ }
+
+ public boolean compareFirstname(Person p1, Person p2) {
+
+ if (p1.getNormalisedFirstName().length() <= 2 || p2.getNormalisedFirstName().length() <= 2) {
+ if (firstLC(p1.getNormalisedFirstName()).equals(firstLC(p2.getNormalisedFirstName())))
+ return true;
+ }
+
+ return ssalgo
+ .score(
+ normalization(p1.getNormalisedFirstName()),
+ normalization(p2.getNormalisedFirstName())) > NAME_THRESHOLD;
+ }
+
+ public String normalization(String s) {
+ return normalize(utf8(cleanup(s)));
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/CityMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/CityMatch.java
new file mode 100644
index 000000000..1d898ad83
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/CityMatch.java
@@ -0,0 +1,48 @@
+
+package eu.dnetlib.pace.tree;
+
+import java.util.Map;
+import java.util.Set;
+
+import eu.dnetlib.pace.config.Config;
+import eu.dnetlib.pace.tree.support.AbstractStringComparator;
+import eu.dnetlib.pace.tree.support.ComparatorClass;
+
+@ComparatorClass("cityMatch")
+public class CityMatch extends AbstractStringComparator {
+
+ private Map params;
+
+ public CityMatch(Map params) {
+ super(params);
+ this.params = params;
+ }
+
+ @Override
+ public double distance(final String a, final String b, final Config conf) {
+
+ String ca = cleanup(a);
+ String cb = cleanup(b);
+
+ ca = normalize(ca);
+ cb = normalize(cb);
+
+ ca = filterAllStopWords(ca);
+ cb = filterAllStopWords(cb);
+
+ Set cities1 = getCities(ca, Integer.parseInt(params.getOrDefault("windowSize", "4")));
+ Set cities2 = getCities(cb, Integer.parseInt(params.getOrDefault("windowSize", "4")));
+
+ Set codes1 = citiesToCodes(cities1);
+ Set codes2 = citiesToCodes(cities2);
+
+ // if no cities are detected, the comparator gives 1.0
+ if (codes1.isEmpty() && codes2.isEmpty())
+ return 1.0;
+ else {
+ if (codes1.isEmpty() ^ codes2.isEmpty())
+ return -1; // undefined if one of the two has no cities
+ return commonElementsPercentage(codes1, codes2);
+ }
+ }
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/CosineSimilarity.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/CosineSimilarity.java
new file mode 100644
index 000000000..d255612ba
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/CosineSimilarity.java
@@ -0,0 +1,47 @@
+
+package eu.dnetlib.pace.tree;
+
+import java.util.Map;
+
+import eu.dnetlib.pace.config.Config;
+import eu.dnetlib.pace.tree.support.AbstractComparator;
+import eu.dnetlib.pace.tree.support.ComparatorClass;
+
+@ComparatorClass("cosineSimilarity")
+public class CosineSimilarity extends AbstractComparator {
+
+ Map params;
+
+ public CosineSimilarity(Map params) {
+ super(params);
+ }
+
+ @Override
+ public double compare(Object a, Object b, Config config) {
+ return compare((double[]) a, (double[]) b, config);
+ }
+
+ public double compare(final double[] a, final double[] b, final Config conf) {
+
+ if (a.length == 0 || b.length == 0)
+ return -1;
+
+ return cosineSimilarity(a, b);
+ }
+
+ double cosineSimilarity(double[] a, double[] b) {
+ double dotProduct = 0;
+ double normASum = 0;
+ double normBSum = 0;
+
+ for (int i = 0; i < a.length; i++) {
+ dotProduct += a[i] * b[i];
+ normASum += a[i] * a[i];
+ normBSum += b[i] * b[i];
+ }
+
+ double eucledianDist = Math.sqrt(normASum) * Math.sqrt(normBSum);
+ return dotProduct / eucledianDist;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/DoiExactMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/DoiExactMatch.java
new file mode 100644
index 000000000..d3c5bc10d
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/DoiExactMatch.java
@@ -0,0 +1,27 @@
+
+package eu.dnetlib.pace.tree;
+
+import java.util.Map;
+
+import eu.dnetlib.pace.tree.support.ComparatorClass;
+
+/**
+ * The Class ExactMatch.
+ *
+ * @author claudio
+ */
+@ComparatorClass("doiExactMatch")
+public class DoiExactMatch extends ExactMatchIgnoreCase {
+
+ public final String PREFIX = "(http:\\/\\/dx\\.doi\\.org\\/)|(doi:)";
+
+ public DoiExactMatch(final Map params) {
+ super(params);
+ }
+
+ @Override
+ protected String toString(final Object f) {
+ return super.toString(f).replaceAll(PREFIX, "");
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/DomainExactMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/DomainExactMatch.java
new file mode 100644
index 000000000..c28274652
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/DomainExactMatch.java
@@ -0,0 +1,30 @@
+
+package eu.dnetlib.pace.tree;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+
+import eu.dnetlib.pace.tree.support.ComparatorClass;
+
+@ComparatorClass("domainExactMatch")
+public class DomainExactMatch extends ExactMatchIgnoreCase {
+
+ public DomainExactMatch(final Map params) {
+ super(params);
+ }
+
+ @Override
+ protected String toString(final Object f) {
+
+ try {
+ return asUrl(super.toString(f)).getHost();
+ } catch (MalformedURLException e) {
+ return "";
+ }
+ }
+
+ private URL asUrl(final String value) throws MalformedURLException {
+ return new URL(value);
+ }
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/ExactMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/ExactMatch.java
new file mode 100644
index 000000000..35357c553
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/ExactMatch.java
@@ -0,0 +1,44 @@
+
+package eu.dnetlib.pace.tree;
+
+import java.util.Map;
+
+import com.wcohen.ss.AbstractStringDistance;
+
+import eu.dnetlib.pace.config.Config;
+import eu.dnetlib.pace.tree.support.AbstractStringComparator;
+import eu.dnetlib.pace.tree.support.ComparatorClass;
+
+@ComparatorClass("exactMatch")
+public class ExactMatch extends AbstractStringComparator {
+
+ public ExactMatch(Map params) {
+ super(params, new com.wcohen.ss.JaroWinkler());
+ }
+
+ public ExactMatch(final double weight) {
+ super(weight, new com.wcohen.ss.JaroWinkler());
+ }
+
+ protected ExactMatch(final double weight, final AbstractStringDistance ssalgo) {
+ super(weight, ssalgo);
+ }
+
+ @Override
+ public double distance(final String a, final String b, final Config conf) {
+ if (a.isEmpty() || b.isEmpty()) {
+ return -1.0; // return -1 if a field is missing
+ }
+ return a.equals(b) ? 1.0 : 0;
+ }
+
+ @Override
+ public double getWeight() {
+ return super.weight;
+ }
+
+ @Override
+ protected double normalize(final double d) {
+ return d;
+ }
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/ExactMatchIgnoreCase.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/ExactMatchIgnoreCase.java
new file mode 100644
index 000000000..85c57ad40
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/ExactMatchIgnoreCase.java
@@ -0,0 +1,29 @@
+
+package eu.dnetlib.pace.tree;
+
+import java.util.Map;
+
+import eu.dnetlib.pace.config.Config;
+import eu.dnetlib.pace.tree.support.AbstractStringComparator;
+import eu.dnetlib.pace.tree.support.ComparatorClass;
+
+@ComparatorClass("exactMatchIgnoreCase")
+public class ExactMatchIgnoreCase extends AbstractStringComparator {
+
+ public ExactMatchIgnoreCase(Map params) {
+ super(params);
+ }
+
+ @Override
+ public double compare(String a, String b, final Config conf) {
+
+ if (a.isEmpty() || b.isEmpty())
+ return -1;
+
+ return a.equalsIgnoreCase(b) ? 1 : 0;
+ }
+
+ protected String toString(final Object object) {
+ return toFirstString(object);
+ }
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/InstanceTypeMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/InstanceTypeMatch.java
new file mode 100644
index 000000000..238cb16ce
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/InstanceTypeMatch.java
@@ -0,0 +1,80 @@
+
+package eu.dnetlib.pace.tree;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Sets;
+
+import eu.dnetlib.pace.config.Config;
+import eu.dnetlib.pace.tree.support.AbstractListComparator;
+import eu.dnetlib.pace.tree.support.ComparatorClass;
+
+@ComparatorClass("instanceTypeMatch")
+public class InstanceTypeMatch extends AbstractListComparator {
+
+ final Map translationMap = new HashMap<>();
+
+ public InstanceTypeMatch(Map params) {
+ super(params);
+
+ // jolly types
+ translationMap.put("Conference object", "*");
+ translationMap.put("Other literature type", "*");
+ translationMap.put("Unknown", "*");
+
+ // article types
+ translationMap.put("Article", "Article");
+ translationMap.put("Data Paper", "Article");
+ translationMap.put("Software Paper", "Article");
+ translationMap.put("Preprint", "Article");
+
+ // thesis types
+ translationMap.put("Thesis", "Thesis");
+ translationMap.put("Master thesis", "Thesis");
+ translationMap.put("Bachelor thesis", "Thesis");
+ translationMap.put("Doctoral thesis", "Thesis");
+ }
+
+ @Override
+ public double compare(final List a, final List b, final Config conf) {
+
+ if (a == null || b == null) {
+ return -1;
+ }
+
+ if (a.isEmpty() || b.isEmpty()) {
+ return -1;
+ }
+
+ final Set ca = a.stream().map(this::translate).collect(Collectors.toSet());
+ final Set cb = b.stream().map(this::translate).collect(Collectors.toSet());
+
+ // if at least one is a jolly type, it must produce a match
+ if (ca.contains("*") || cb.contains("*"))
+ return 1.0;
+
+ int incommon = Sets.intersection(ca, cb).size();
+
+ // if at least one is in common, it must produce a match
+ return incommon >= 1 ? 1 : 0;
+ }
+
+ public String translate(String term) {
+ return translationMap.getOrDefault(term, term);
+ }
+
+ @Override
+ public double getWeight() {
+ return super.weight;
+ }
+
+ @Override
+ protected double normalize(final double d) {
+ return d;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JaroWinkler.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JaroWinkler.java
new file mode 100644
index 000000000..2cb411d26
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JaroWinkler.java
@@ -0,0 +1,46 @@
+
+package eu.dnetlib.pace.tree;
+
+import java.util.Map;
+
+import com.wcohen.ss.AbstractStringDistance;
+
+import eu.dnetlib.pace.config.Config;
+import eu.dnetlib.pace.tree.support.AbstractStringComparator;
+import eu.dnetlib.pace.tree.support.ComparatorClass;
+
+//case class JaroWinkler(w: Double) extends SecondStringDistanceAlgo(w, new com.wcohen.ss.JaroWinkler())
+@ComparatorClass("jaroWinkler")
+public class JaroWinkler extends AbstractStringComparator {
+
+ public JaroWinkler(Map params) {
+ super(params, new com.wcohen.ss.JaroWinkler());
+ }
+
+ public JaroWinkler(double weight) {
+ super(weight, new com.wcohen.ss.JaroWinkler());
+ }
+
+ protected JaroWinkler(double weight, AbstractStringDistance ssalgo) {
+ super(weight, ssalgo);
+ }
+
+ @Override
+ public double distance(String a, String b, final Config conf) {
+ String ca = cleanup(a);
+ String cb = cleanup(b);
+
+ return normalize(ssalgo.score(ca, cb));
+ }
+
+ @Override
+ public double getWeight() {
+ return super.weight;
+ }
+
+ @Override
+ protected double normalize(double d) {
+ return d;
+ }
+
+}
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JaroWinklerNormalizedName.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JaroWinklerNormalizedName.java
new file mode 100644
index 000000000..576b9281d
--- /dev/null
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JaroWinklerNormalizedName.java
@@ -0,0 +1,74 @@
+
+package eu.dnetlib.pace.tree;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.wcohen.ss.AbstractStringDistance;
+
+import eu.dnetlib.pace.config.Config;
+import eu.dnetlib.pace.tree.support.AbstractStringComparator;
+import eu.dnetlib.pace.tree.support.ComparatorClass;
+
+@ComparatorClass("jaroWinklerNormalizedName")
+public class JaroWinklerNormalizedName extends AbstractStringComparator {
+
+ private Map