diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java new file mode 100644 index 000000000..7c1e6550e --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java @@ -0,0 +1,167 @@ +package eu.dnetlib.dhp.oa.dedup; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public class SparkCollectSimRels extends AbstractSparkAction { + + private static final Logger log = LoggerFactory.getLogger(SparkCollectSimRels.class); + + Dataset simGroupsDS; + Dataset groupsDS; + + public SparkCollectSimRels(ArgumentApplicationParser parser, SparkSession spark, Dataset simGroupsDS, Dataset groupsDS) { + super(parser, spark); + this.simGroupsDS = simGroupsDS; + this.groupsDS = groupsDS; + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkBlockStats.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + + SparkSession spark = getSparkSession(conf); + + DataFrameReader readOptions = spark.read() + .format("jdbc") + .option("url", dbUrl) + .option("user", dbUser) + .option("password", dbPassword); + + new SparkCollectSimRels( + parser, + spark, + readOptions.option("dbtable", "similarity_groups").load(), + readOptions.option("dbtable", "groups").load() + ).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + void run(ISLookUpService isLookUpService) { + + // read oozie parameters + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + + log.info("numPartitions: '{}'", numPartitions); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + log.info("postgresUser: {}", dbUser); + log.info("postgresUrl: {}", dbUrl); + log.info("postgresPassword: xxx"); + + JavaPairRDD> similarityGroup = + simGroupsDS + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1))) + .groupByKey() + .mapToPair(i -> new Tuple2<>(i._1(), StreamSupport.stream(i._2().spliterator(), false) + .collect(Collectors.toList()))); + + JavaPairRDD groupIds = + groupsDS + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1))); + + JavaRDD, List>> groups = similarityGroup + .leftOuterJoin(groupIds) + .filter(g -> g._2()._2().isPresent()) + .map(g -> new Tuple2<>(new Tuple2<>(g._1(), g._2()._2().get()), g._2()._1())); + + JavaRDD relations = groups.flatMap(g -> { + String firstId = g._2().get(0); + List rels = new ArrayList<>(); + + for (String id : g._2()) { + if (!firstId.equals(id)) + rels.add(createSimRel(firstId, id, g._1()._2())); + } + + return rels.iterator(); + }); + + Dataset resultRelations = spark.createDataset( + relations.filter(r -> r.getRelType().equals("resultResult")).rdd(), + Encoders.bean(Relation.class) + ).repartition(numPartitions); + + Dataset organizationRelations = spark.createDataset( + relations.filter(r -> r.getRelType().equals("organizationOrganization")).rdd(), + Encoders.bean(Relation.class) + ).repartition(numPartitions); + + savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization"); + savePostgresRelation(resultRelations, workingPath, actionSetId, "publication"); + savePostgresRelation(resultRelations, workingPath, actionSetId, "software"); + savePostgresRelation(resultRelations, workingPath, actionSetId, "otherresearchproduct"); + savePostgresRelation(resultRelations, workingPath, actionSetId, "dataset"); + + } + + private Relation createSimRel(String source, String target, String entity) { + final Relation r = new Relation(); + r.setSubRelType("dedupSimilarity"); + r.setRelClass("isSimilarTo"); + r.setDataInfo(new DataInfo()); + + switch (entity) { + case "result": + r.setSource("50|" + source); + r.setTarget("50|" + target); + r.setRelType("resultResult"); + break; + case "organization": + r.setSource("20|" + source); + r.setTarget("20|" + target); + r.setRelType("organizationOrganization"); + break; + default: + throw new IllegalArgumentException("unmanaged entity type: " + entity); + } + return r; + } + + private void savePostgresRelation(Dataset newRelations, String workingPath, String actionSetId, String entityType) { + newRelations + .write() + .mode(SaveMode.Append) + .parquet(DedupUtility.createSimRelPath(workingPath, actionSetId, entityType)); + } + +} \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 6d625cd11..ce6226dde 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -104,15 +104,13 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) .mapToPair((PairFunction) s -> new Tuple2<>(hash(s), s)); - final RDD> edgeRdd = spark - .read() - .textFile(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) - .map( - (MapFunction) r -> OBJECT_MAPPER.readValue(r, Relation.class), - Encoders.bean(Relation.class)) - .javaRDD() - .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) - .rdd(); + final RDD> edgeRdd = spark + .read() + .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) + .as(Encoders.bean(Relation.class)) + .javaRDD() + .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) + .rdd(); final Dataset mergeRels = spark .createDataset( diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index b3ee47bfc..babccefb4 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -100,12 +100,17 @@ public class SparkCreateSimRels extends AbstractSparkAction { .repartition(numPartitions); // create relations by comparing only elements in the same group - Deduper - .computeRelations(sc, blocks, dedupConf) - .map(t -> createSimRel(t._1(), t._2(), entity)) - .repartition(numPartitions) - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(outputPath); + spark.createDataset( + Deduper + .computeRelations(sc, blocks, dedupConf) + .map(t -> createSimRel(t._1(), t._2(), entity)) + .repartition(numPartitions) + .rdd(), + Encoders.bean(Relation.class) + ) + .write() + .mode(SaveMode.Append) + .parquet(outputPath); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json new file mode 100644 index 000000000..da1011371 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json @@ -0,0 +1,44 @@ +[ + { + "paramName": "la", + "paramLongName": "isLookUpUrl", + "paramDescription": "address for the LookUp", + "paramRequired": true + }, + { + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescription": "action set identifier (name of the orchestrator)", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "path of the working directory", + "paramRequired": true + }, + { + "paramName": "np", + "paramLongName": "numPartitions", + "paramDescription": "number of partitions for the similarity relations intermediate phases", + "paramRequired": false + }, + { + "paramName": "purl", + "paramLongName": "postgresUrl", + "paramDescription": "the url of the postgres server", + "paramRequired": true + }, + { + "paramName": "pusr", + "paramLongName": "postgresUser", + "paramDescription": "the owner of the postgres database", + "paramRequired": true + }, + { + "paramName": "ppwd", + "paramLongName": "postgresPassword", + "paramDescription": "the password for the postgres user", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 35c4c7026..59c850591 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -1,4 +1,3 @@ - package eu.dnetlib.dhp.oa.dedup; import com.fasterxml.jackson.databind.ObjectMapper; @@ -16,10 +15,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -52,6 +48,7 @@ public class SparkDedupTest implements Serializable { private static String testOutputBasePath; private static String testDedupGraphBasePath; private static final String testActionSetId = "test-orchestrator"; + private static String testDedupAssertionsBasePath; @BeforeAll public static void cleanUp() throws IOException, URISyntaxException { @@ -66,6 +63,10 @@ public class SparkDedupTest implements Serializable { testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") .toAbsolutePath() .toString(); + testDedupAssertionsBasePath = Paths + .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/assertions").toURI()) + .toFile() + .getAbsolutePath(); FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); @@ -80,7 +81,8 @@ public class SparkDedupTest implements Serializable { .getOrCreate(); jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - } + + } @BeforeEach public void setUp() throws IOException, ISLookUpException { @@ -150,6 +152,7 @@ public class SparkDedupTest implements Serializable { SparkCreateSimRels.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + parser .parseArgument( new String[] { @@ -162,30 +165,30 @@ public class SparkDedupTest implements Serializable { new SparkCreateSimRels(parser, spark).run(isLookUpService); - long orgs_simrel = spark - .read() - .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") - .count(); + long orgs_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") + .count(); - long pubs_simrel = spark - .read() - .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") - .count(); + long pubs_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") + .count(); - long sw_simrel = spark - .read() - .textFile(testOutputBasePath + "/" + testActionSetId + "/software_simrel") - .count(); + long sw_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel") + .count(); - long ds_simrel = spark - .read() - .textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel") - .count(); + long ds_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel") + .count(); - long orp_simrel = spark - .read() - .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") - .count(); + long orp_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") + .count(); assertEquals(3432, orgs_simrel); assertEquals(7152, pubs_simrel); @@ -194,8 +197,69 @@ public class SparkDedupTest implements Serializable { assertEquals(6750, orp_simrel); } + @Test + @Order(2) + public void collectSimRelsTest() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json"))); + parser + .parseArgument( + new String[] { + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath, + "-np", "50", + "-purl", "jdbc:postgresql://localhost:5432/dnet_dedup", + "-pusr", "postgres_url", + "-ppwd", "" + }); + + new SparkCollectSimRels( + parser, + spark, + spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"), + spark.read().load(testDedupAssertionsBasePath + "/groups") + ).run(null); + + long orgs_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") + .count(); + + long pubs_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") + .count(); + + long sw_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel") + .count(); + + long ds_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel") + .count(); + + long orp_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") + .count(); + + assertEquals(4022, orgs_simrel); + assertEquals(10575, pubs_simrel); + assertEquals(3767, sw_simrel); + assertEquals(3881, ds_simrel); + assertEquals(10173, orp_simrel); + + } + @Test - @Order(2) + @Order(3) public void cutMergeRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -204,6 +268,7 @@ public class SparkDedupTest implements Serializable { SparkCreateMergeRels.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + parser .parseArgument( new String[] { @@ -290,7 +355,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(3) + @Order(4) public void createMergeRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -299,6 +364,7 @@ public class SparkDedupTest implements Serializable { SparkCreateMergeRels.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + parser .parseArgument( new String[] { @@ -344,7 +410,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(4) + @Order(5) public void createDedupRecordTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -391,7 +457,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(5) + @Order(6) public void updateEntityTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -507,7 +573,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(6) + @Order(7) public void propagateRelationTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -557,7 +623,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(7) + @Order(8) public void testRelations() throws Exception { testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10); testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); @@ -575,11 +641,11 @@ public class SparkDedupTest implements Serializable { assertEquals(expected_unique, rel.distinct().count()); } - @AfterAll - public static void finalCleanUp() throws IOException { - FileUtils.deleteDirectory(new File(testOutputBasePath)); - FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); - } +// @AfterAll +// public static void finalCleanUp() throws IOException { +// FileUtils.deleteDirectory(new File(testOutputBasePath)); +// FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); +// } public boolean isDeletedByInference(String s) { return s.contains("\"deletedbyinference\":true"); diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/groups/._SUCCESS.crc b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/groups/._SUCCESS.crc new file mode 100644 index 000000000..3b7b04493 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/groups/._SUCCESS.crc differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/groups/.part-00000-4bafcd13-3995-4d26-9cf4-eae22806175b-c000.snappy.parquet.crc b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/groups/.part-00000-4bafcd13-3995-4d26-9cf4-eae22806175b-c000.snappy.parquet.crc new file mode 100644 index 000000000..de674144d Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/groups/.part-00000-4bafcd13-3995-4d26-9cf4-eae22806175b-c000.snappy.parquet.crc differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/groups/_SUCCESS b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/groups/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/groups/part-00000-4bafcd13-3995-4d26-9cf4-eae22806175b-c000.snappy.parquet b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/groups/part-00000-4bafcd13-3995-4d26-9cf4-eae22806175b-c000.snappy.parquet new file mode 100644 index 000000000..ea7655139 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/groups/part-00000-4bafcd13-3995-4d26-9cf4-eae22806175b-c000.snappy.parquet differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/similarity_groups/._SUCCESS.crc b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/similarity_groups/._SUCCESS.crc new file mode 100644 index 000000000..3b7b04493 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/similarity_groups/._SUCCESS.crc differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/similarity_groups/.part-00000-ad5faba8-5922-42f1-a215-1619e7bb4e5d-c000.snappy.parquet.crc b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/similarity_groups/.part-00000-ad5faba8-5922-42f1-a215-1619e7bb4e5d-c000.snappy.parquet.crc new file mode 100644 index 000000000..9ac084585 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/similarity_groups/.part-00000-ad5faba8-5922-42f1-a215-1619e7bb4e5d-c000.snappy.parquet.crc differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/similarity_groups/_SUCCESS b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/similarity_groups/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/similarity_groups/part-00000-ad5faba8-5922-42f1-a215-1619e7bb4e5d-c000.snappy.parquet b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/similarity_groups/part-00000-ad5faba8-5922-42f1-a215-1619e7bb4e5d-c000.snappy.parquet new file mode 100644 index 000000000..e494e5ac0 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/assertions/similarity_groups/part-00000-ad5faba8-5922-42f1-a215-1619e7bb4e5d-c000.snappy.parquet differ