diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index da16e6dffe..1c957c9e09 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -27,15 +27,15 @@ public class DedupRecordFactory { .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); public static Dataset createDedupRecord( - final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final Class clazz, final DedupConfig dedupConf) { + final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final Class clazz) { long ts = System.currentTimeMillis(); // Dataset> entities = spark.read() .textFile(entitiesInputPath) - .map((MapFunction>) it -> { - T entity = OBJECT_MAPPER.readValue(it, clazz); + .map((MapFunction>) s -> { + T entity = OBJECT_MAPPER.readValue(s, clazz); return new Tuple2<>(entity.getId(), entity); }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index c2b1fc9c24..8e90d2a1ff 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -10,10 +10,7 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -70,7 +67,7 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); - DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz, dedupConf) + DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz) .map((MapFunction) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING()) .write() .mode(SaveMode.Overwrite) diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index bcc5ddeaa6..01cbf04890 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -95,7 +95,6 @@ public class SparkDedupTest implements Serializable { IOUtils.toString( SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); parser.parseArgument(new String[]{ - "-mt", "local[*]", "-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", @@ -120,7 +119,6 @@ public class SparkDedupTest implements Serializable { IOUtils.toString( SparkCreateMergeRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); parser.parseArgument(new String[]{ - "-mt", "local[*]", "-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", @@ -145,7 +143,6 @@ public class SparkDedupTest implements Serializable { IOUtils.toString( SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); parser.parseArgument(new String[]{ - "-mt", "local[*]", "-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", @@ -170,7 +167,6 @@ public class SparkDedupTest implements Serializable { IOUtils.toString( SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); parser.parseArgument(new String[]{ - "-mt", "local[*]", "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath @@ -221,7 +217,6 @@ public class SparkDedupTest implements Serializable { IOUtils.toString( SparkPropagateRelation.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); parser.parseArgument(new String[]{ - "-mt", "local[*]", "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath