From c6bd13f0c8cf25161086ee248f1112c8aeaf14f5 Mon Sep 17 00:00:00 2001 From: miconis Date: Mon, 14 Oct 2024 09:48:34 +0200 Subject: [PATCH] optimization of the local test by removing unnecessary steps --- .../oa/dedup/local/SparkDedupLocalTest.java | 62 +------------------ 1 file changed, 1 insertion(+), 61 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/SparkDedupLocalTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/SparkDedupLocalTest.java index 662c468e0..9e1e4ad40 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/SparkDedupLocalTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/SparkDedupLocalTest.java @@ -59,7 +59,6 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils { .toURI()) .toFile() .getAbsolutePath(); - final String dedupConfPath = Paths .get( Objects @@ -68,7 +67,6 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils { .toFile() .getAbsolutePath(); - final static int MAX_ACCEPTANCE_DATE = 20; private static SparkDeduper deduper; private static SparkModel model; @@ -193,67 +191,9 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils { .where("relClass == 'merges'") .selectExpr("source as dedupId", "target as id"); - Dataset dedupRecords = mergeRelsRow - .join(kryoEntities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") - .select("dedupId", "id", "kryoObject") - .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) - .groupByKey((MapFunction, String>) Tuple3::_1, Encoders.STRING()) - .flatMapGroups( - (FlatMapGroupsFunction, OafEntity>) (dedupId, it) -> { - if (!it.hasNext()) - return Collections.emptyIterator(); - - final ArrayList cliques_ = new ArrayList<>(); - - final ArrayList aliases = new ArrayList<>(); - - final HashSet acceptanceDate = new HashSet<>(); - - while (it.hasNext()) { - Tuple3 t = it.next(); - OafEntity entity = t._3(); - - if (entity == null) { - aliases.add(t._2()); - } else { - cliques_.add(entity); - - if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { // max acceptance date - if (Result.class.isAssignableFrom(entity.getClass())) { - Result result = (Result) entity; - if (result.getDateofacceptance() != null - && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { - acceptanceDate.add(result.getDateofacceptance().getValue()); - } - } - } - } - - } - - if (acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques_.isEmpty()) { - return Collections.emptyIterator(); - } - - OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques_.iterator()); - // dedup records do not have date of transformation attribute - mergedEntity.setDateoftransformation(null); - - return Stream - .concat( - Stream - .of(dedupId) - .map(id -> DedupLocalTestUtils.createOafEntity(id, mergedEntity, before_dedupentity)), - aliases - .stream() - .map(id -> DedupLocalTestUtils.createOafEntity(id, mergedEntity, before_dedupentity))) - .iterator(); - - }, beanEncoder); - long dedupentity_time = System.currentTimeMillis() - before_dedupentity; - long dedupentity_number = dedupRecords.count(); + long dedupentity_number = mergeRelsRow.toJavaRDD().map(r -> r.getAs("dedupId")).distinct().count(); System.out.println("Number of simRels : " + simrels_number); System.out.println("Number of mergeRels : " + mergerels_number);