indentation fixed

This commit is contained in:
Michele De Bonis 2024-10-10 10:48:51 +02:00
parent efe111de82
commit 46db6b02d3
5 changed files with 479 additions and 427 deletions

View File

@ -1,12 +1,11 @@
package eu.dnetlib.dhp.oa.dedup.local; package eu.dnetlib.dhp.oa.dedup.local;
import com.cloudera.com.fasterxml.jackson.core.JsonFactory; import java.io.*;
import com.cloudera.com.fasterxml.jackson.databind.JsonNode; import java.util.ArrayList;
import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper; import java.util.List;
import eu.dnetlib.dhp.schema.common.ModelConstants; import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.pace.config.DedupConfig;
import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections4.IteratorUtils; import org.apache.commons.collections4.IteratorUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -16,35 +15,52 @@ import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.spark_project.guava.hash.Hashing; import org.spark_project.guava.hash.Hashing;
import com.cloudera.com.fasterxml.jackson.core.JsonFactory;
import com.cloudera.com.fasterxml.jackson.databind.JsonNode;
import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.pace.config.DedupConfig;
import scala.collection.JavaConverters; import scala.collection.JavaConverters;
import scala.collection.convert.Wrappers; import scala.collection.convert.Wrappers;
import scala.collection.mutable.ArrayBuffer; import scala.collection.mutable.ArrayBuffer;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public abstract class DedupLocalTestUtils { public abstract class DedupLocalTestUtils {
public static String prepareTable(Row doc) { public static String prepareTable(Row doc) {
StringBuilder ret = new StringBuilder("<table>"); StringBuilder ret = new StringBuilder("<table>");
for(String fieldName: doc.schema().fieldNames()) { for (String fieldName : doc.schema().fieldNames()) {
Object value = doc.getAs(fieldName); Object value = doc.getAs(fieldName);
if(value.getClass() == String.class){ if (value.getClass() == String.class) {
ret.append("<tr><th>").append(fieldName).append("</th><td>").append(value).append("</td></tr>"); ret.append("<tr><th>").append(fieldName).append("</th><td>").append(value).append("</td></tr>");
} } else if (value.getClass() == Wrappers.JListWrapper.class) {
else if(value.getClass() == Wrappers.JListWrapper.class) { List<String> values = IteratorUtils
List<String> values = IteratorUtils.toList(JavaConverters.asJavaIteratorConverter(((Wrappers.JListWrapper<String>) value).iterator()).asJava()) .toList(
JavaConverters
.asJavaIteratorConverter(((Wrappers.JListWrapper<String>) value).iterator())
.asJava())
.stream() .stream()
.map(DedupLocalTestUtils::takeValue) .map(DedupLocalTestUtils::takeValue)
.collect(Collectors.toList()); .collect(Collectors.toList());
ret.append("<tr><th>").append(fieldName).append("</th><td>[").append(String.join(";", values)).append("]</td></tr>"); ret
} .append("<tr><th>")
else if(value.getClass() == ArrayBuffer.class){ .append(fieldName)
List<String> values = new ArrayList<>(IteratorUtils.toList(JavaConverters.asJavaIteratorConverter(((ArrayBuffer<String>) value).iterator()).asJava())); .append("</th><td>[")
ret.append("<tr><th>").append(fieldName).append("</th><td>[").append(String.join(";", values)).append("]</td></tr>"); .append(String.join(";", values))
.append("]</td></tr>");
} else if (value.getClass() == ArrayBuffer.class) {
List<String> values = new ArrayList<>(IteratorUtils
.toList(JavaConverters.asJavaIteratorConverter(((ArrayBuffer<String>) value).iterator()).asJava()));
ret
.append("<tr><th>")
.append(fieldName)
.append("</th><td>[")
.append(String.join(";", values))
.append("]</td></tr>");
} }
} }
@ -56,9 +72,9 @@ public abstract class DedupLocalTestUtils {
protected static String fileToString(String filePath) throws IOException { protected static String fileToString(String filePath) throws IOException {
Path path=new Path(filePath); Path path = new Path(filePath);
FileSystem fs = FileSystem.get(new Configuration()); FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(path))); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
try { try {
return String.join("", br.lines().collect(Collectors.toList())); return String.join("", br.lines().collect(Collectors.toList()));
} finally { } finally {
@ -66,15 +82,26 @@ public abstract class DedupLocalTestUtils {
} }
} }
public static void prepareGraphParams(Dataset<Row> entities, Dataset<Relation> simRels, String filePath, String templateFilePath) { public static void prepareGraphParams(Dataset<Row> entities, Dataset<Relation> simRels, String filePath,
String templateFilePath) {
List<String> vertexes = entities.toJavaRDD().map(r -> r.getAs("identifier").toString()).collect(); List<String> vertexes = entities.toJavaRDD().map(r -> r.getAs("identifier").toString()).collect();
List<Node> nodes = entities.toJavaRDD().map(e -> new Node(e.getAs("identifier").toString(), vertexes.indexOf(e.getAs("identifier").toString()), prepareTable(e))).collect(); List<Node> nodes = entities
.toJavaRDD()
.map(
e -> new Node(e.getAs("identifier").toString(), vertexes.indexOf(e.getAs("identifier").toString()),
prepareTable(e)))
.collect();
List<Edge> edges = simRels.toJavaRDD().collect().stream().map(sr -> new Edge(vertexes.indexOf(sr.getSource()), vertexes.indexOf(sr.getTarget()))).collect(Collectors.toList()); List<Edge> edges = simRels
.toJavaRDD()
.collect()
.stream()
.map(sr -> new Edge(vertexes.indexOf(sr.getSource()), vertexes.indexOf(sr.getTarget())))
.collect(Collectors.toList());
try(FileWriter fw = new FileWriter(filePath)) { try (FileWriter fw = new FileWriter(filePath)) {
String fullText = IOUtils.toString(new FileReader(templateFilePath)); String fullText = IOUtils.toString(new FileReader(templateFilePath));
String s = fullText String s = fullText
@ -130,7 +157,7 @@ public abstract class DedupLocalTestUtils {
} }
class Node implements Serializable{ class Node implements Serializable {
String label; String label;
int id; int id;
String title; String title;
@ -166,7 +193,7 @@ class Node implements Serializable{
} }
} }
class Edge implements Serializable{ class Edge implements Serializable {
int from; int from;
int to; int to;

View File

@ -1,19 +1,15 @@
package eu.dnetlib.dhp.oa.dedup.local; package eu.dnetlib.dhp.oa.dedup.local;
import com.google.common.collect.Lists; import java.awt.*;
import com.kwartile.lib.cc.ConnectedComponent; import java.io.File;
import eu.dnetlib.dhp.oa.dedup.DedupUtility; import java.io.IOException;
import eu.dnetlib.dhp.schema.common.EntityType; import java.net.URISyntaxException;
import eu.dnetlib.dhp.schema.common.ModelConstants; import java.nio.file.Paths;
import eu.dnetlib.dhp.schema.common.ModelSupport; import java.util.*;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import java.util.List;
import eu.dnetlib.dhp.schema.oaf.Relation; import java.util.stream.Stream;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.SparkDeduper;
import eu.dnetlib.pace.model.SparkModel;
import eu.dnetlib.pace.tree.support.TreeProcessor;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.FlatMapGroupsFunction; import org.apache.spark.api.java.function.FlatMapGroupsFunction;
@ -26,20 +22,27 @@ import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.platform.commons.util.StringUtils; import org.junit.platform.commons.util.StringUtils;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import com.google.common.collect.Lists;
import com.kwartile.lib.cc.ConnectedComponent;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.SparkDeduper;
import eu.dnetlib.pace.model.SparkModel;
import eu.dnetlib.pace.tree.support.TreeProcessor;
import scala.Tuple2; import scala.Tuple2;
import scala.Tuple3; import scala.Tuple3;
import scala.collection.JavaConversions; import scala.collection.JavaConversions;
import scala.collection.mutable.WrappedArray; import scala.collection.mutable.WrappedArray;
import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.List;
import java.util.*;
import java.util.stream.Stream;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) @TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ -50,12 +53,18 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
static JavaSparkContext context; static JavaSparkContext context;
final String entitiesPath = Paths final String entitiesPath = Paths
.get(Objects.requireNonNull(SparkDedupLocalTest.class.getResource("/eu/dnetlib/dhp/dedup/entities/publication")).toURI()) .get(
Objects
.requireNonNull(SparkDedupLocalTest.class.getResource("/eu/dnetlib/dhp/dedup/entities/publication"))
.toURI())
.toFile() .toFile()
.getAbsolutePath(); .getAbsolutePath();
final String dedupConfPath = Paths final String dedupConfPath = Paths
.get(Objects.requireNonNull(SparkDedupLocalTest.class.getResource("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")).toURI()) .get(
Objects
.requireNonNull(SparkDedupLocalTest.class.getResource("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))
.toURI())
.toFile() .toFile()
.getAbsolutePath(); .getAbsolutePath();
@ -85,7 +94,7 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
} }
@Test //full deduplication workflow test @Test // full deduplication workflow test
@Disabled @Disabled
public void deduplicationTest() { public void deduplicationTest() {
@ -99,9 +108,11 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
Dataset<Relation> simRels = entities Dataset<Relation> simRels = entities
.transform(deduper.dedup()) .transform(deduper.dedup())
.distinct() .distinct()
.map((MapFunction<Row, Relation>) t -> .map(
DedupUtility.createSimRel(t.getStruct(0).getString(0), t.getStruct(0).getString(1), config.getWf().getEntityType()), Encoders.bean(Relation.class) (MapFunction<Row, Relation>) t -> DedupUtility
); .createSimRel(
t.getStruct(0).getString(0), t.getStruct(0).getString(1), config.getWf().getEntityType()),
Encoders.bean(Relation.class));
long simrels_time = System.currentTimeMillis() - before_simrels; long simrels_time = System.currentTimeMillis() - before_simrels;
@ -127,11 +138,10 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
.withColumn("source", hashUDF.apply(functions.col("source"))) .withColumn("source", hashUDF.apply(functions.col("source")))
.withColumn("target", hashUDF.apply(functions.col("target"))); .withColumn("target", hashUDF.apply(functions.col("target")));
for(Relation r: simRels.toJavaRDD().collect()) { for (Relation r : simRels.toJavaRDD().collect()) {
System.out.println(r.getSource() + " ---> " + r.getTarget()); System.out.println(r.getSource() + " ---> " + r.getTarget());
} }
// resolve connected components // resolve connected components
// ("vertexId", "groupId") // ("vertexId", "groupId")
Dataset<Row> cliques = ConnectedComponent Dataset<Row> cliques = ConnectedComponent
@ -155,18 +165,23 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
return res.iterator(); return res.iterator();
}, Encoders.bean(Relation.class)); }, Encoders.bean(Relation.class));
long mergerels_time = System.currentTimeMillis() - before_mergerels; long mergerels_time = System.currentTimeMillis() - before_mergerels;
long mergerels_number = mergeRels.count(); long mergerels_number = mergeRels.count();
long before_dedupentity = System.currentTimeMillis(); long before_dedupentity = System.currentTimeMillis();
final Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(config.getWf().getSubEntityValue())); final Class<OafEntity> clazz = ModelSupport.entityTypes
.get(EntityType.valueOf(config.getWf().getSubEntityValue()));
final Encoder<OafEntity> beanEncoder = Encoders.bean(clazz); final Encoder<OafEntity> beanEncoder = Encoders.bean(clazz);
final Encoder<OafEntity> kryoEncoder = Encoders.kryo(clazz); final Encoder<OafEntity> kryoEncoder = Encoders.kryo(clazz);
Dataset<Row> kryoEntities = spark.read().schema(Encoders.bean(clazz).schema()).json(entitiesPath).as(beanEncoder).map( Dataset<Row> kryoEntities = spark
.read()
.schema(Encoders.bean(clazz).schema())
.json(entitiesPath)
.as(beanEncoder)
.map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) entity -> { (MapFunction<OafEntity, Tuple2<String, OafEntity>>) entity -> {
return new Tuple2<>(entity.getId(), entity); return new Tuple2<>(entity.getId(), entity);
}, },
@ -203,7 +218,7 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
} else { } else {
cliques_.add(entity); cliques_.add(entity);
if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { //max acceptance date if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { // max acceptance date
if (Result.class.isAssignableFrom(entity.getClass())) { if (Result.class.isAssignableFrom(entity.getClass())) {
Result result = (Result) entity; Result result = (Result) entity;
if (result.getDateofacceptance() != null if (result.getDateofacceptance() != null
@ -251,7 +266,7 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
} }
@Test //test the match between two JSON @Test // test the match between two JSON
@Disabled @Disabled
public void matchTest() { public void matchTest() {
@ -261,13 +276,13 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
Row a = model.rowFromJson(json1); Row a = model.rowFromJson(json1);
Row b = model.rowFromJson(json2); Row b = model.rowFromJson(json2);
boolean result = new TreeProcessor(config).compare(a,b); boolean result = new TreeProcessor(config).compare(a, b);
System.out.println("Tree Processor Result = " + result); System.out.println("Tree Processor Result = " + result);
} }
@Test //test the keys between two JSON @Test // test the keys between two JSON
@Disabled @Disabled
public void blockingTest() { public void blockingTest() {
@ -279,11 +294,12 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
List<Row> rows = Lists.newArrayList(a, b); List<Row> rows = Lists.newArrayList(a, b);
Dataset<Row> rowsDS = spark.createDataset(rows, RowEncoder.apply(model.schema())) Dataset<Row> rowsDS = spark
.createDataset(rows, RowEncoder.apply(model.schema()))
.transform(deduper.filterAndCleanup()) .transform(deduper.filterAndCleanup())
.transform(deduper.generateClustersWithCollect()); .transform(deduper.generateClustersWithCollect());
for (Row r: rowsDS.toJavaRDD().collect()) { for (Row r : rowsDS.toJavaRDD().collect()) {
System.out.println("block key = " + r.get(0)); System.out.println("block key = " + r.get(0));
System.out.println("block size = " + r.get(1)); System.out.println("block size = " + r.get(1));
} }
@ -296,7 +312,15 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
entities, entities,
simRels, simRels,
"/tmp/graph.html", "/tmp/graph.html",
Paths.get(Objects.requireNonNull(SparkDedupLocalTest.class.getResource("/eu/dnetlib/dhp/dedup/visualization_tools/graph_template.html")).toURI()).toFile().getAbsolutePath()); Paths
.get(
Objects
.requireNonNull(
SparkDedupLocalTest.class
.getResource("/eu/dnetlib/dhp/dedup/visualization_tools/graph_template.html"))
.toURI())
.toFile()
.getAbsolutePath());
Desktop.getDesktop().browse(new File("/tmp/graph.html").toURI()); Desktop.getDesktop().browse(new File("/tmp/graph.html").toURI());
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();

View File

@ -31,6 +31,7 @@ class ORCIDAuthorMatchersTest {
assertTrue(matchOrderedTokenAndAbbreviations("孙林 Sun Lin", "Sun Lin")) assertTrue(matchOrderedTokenAndAbbreviations("孙林 Sun Lin", "Sun Lin"))
// assertTrue(AuthorsMatchRevised.compare("孙林 Sun Lin", "孙林")); // not yet implemented // assertTrue(AuthorsMatchRevised.compare("孙林 Sun Lin", "孙林")); // not yet implemented
} }
@Test def testDocumentationNames(): Unit = { @Test def testDocumentationNames(): Unit = {
assertTrue(matchOrderedTokenAndAbbreviations("James C. A. Miller-Jones", "James Antony Miller-Jones")) assertTrue(matchOrderedTokenAndAbbreviations("James C. A. Miller-Jones", "James Antony Miller-Jones"))
} }