This commit is contained in:
Claudio Atzori 2020-04-18 12:24:22 +02:00
parent 9374ff03ea
commit a2938dd059
3 changed files with 4 additions and 12 deletions

View File

@ -27,15 +27,15 @@ public class DedupRecordFactory {
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
public static <T extends OafEntity> Dataset<T> createDedupRecord(
final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final Class<T> clazz, final DedupConfig dedupConf) {
final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final Class<T> clazz) {
long ts = System.currentTimeMillis();
//<id, json_entity>
Dataset<Tuple2<String, T>> entities = spark.read()
.textFile(entitiesInputPath)
.map((MapFunction<String, Tuple2<String, T>>) it -> {
T entity = OBJECT_MAPPER.readValue(it, clazz);
.map((MapFunction<String, Tuple2<String, T>>) s -> {
T entity = OBJECT_MAPPER.readValue(s, clazz);
return new Tuple2<>(entity.getId(), entity);
}, Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));

View File

@ -10,10 +10,7 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
@ -70,7 +67,7 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz, dedupConf)
DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz)
.map((MapFunction<OafEntity, String>) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING())
.write()
.mode(SaveMode.Overwrite)

View File

@ -95,7 +95,6 @@ public class SparkDedupTest implements Serializable {
IOUtils.toString(
SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
parser.parseArgument(new String[]{
"-mt", "local[*]",
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-la", "lookupurl",
@ -120,7 +119,6 @@ public class SparkDedupTest implements Serializable {
IOUtils.toString(
SparkCreateMergeRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser.parseArgument(new String[]{
"-mt", "local[*]",
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-la", "lookupurl",
@ -145,7 +143,6 @@ public class SparkDedupTest implements Serializable {
IOUtils.toString(
SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json")));
parser.parseArgument(new String[]{
"-mt", "local[*]",
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-la", "lookupurl",
@ -170,7 +167,6 @@ public class SparkDedupTest implements Serializable {
IOUtils.toString(
SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
parser.parseArgument(new String[]{
"-mt", "local[*]",
"-i", testGraphBasePath,
"-w", testOutputBasePath,
"-o", testDedupGraphBasePath
@ -221,7 +217,6 @@ public class SparkDedupTest implements Serializable {
IOUtils.toString(
SparkPropagateRelation.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")));
parser.parseArgument(new String[]{
"-mt", "local[*]",
"-i", testGraphBasePath,
"-w", testOutputBasePath,
"-o", testDedupGraphBasePath