code formatting

This commit is contained in:
Claudio Atzori 2024-01-23 08:47:12 +01:00
parent bd187ec6e7
commit 6fd25cf549
3 changed files with 158 additions and 149 deletions

View File

@ -1,13 +1,7 @@
package eu.dnetlib.dhp.oozie; package eu.dnetlib.dhp.oozie;
import com.google.common.io.Resources; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -15,7 +9,15 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.io.Resources;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class RunSQLSparkJob { public class RunSQLSparkJob {
private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class); private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);

View File

@ -1,6 +1,16 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import java.util.*;
import java.util.stream.Stream;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
@ -8,180 +18,176 @@ import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import scala.Tuple2; import scala.Tuple2;
import scala.Tuple3; import scala.Tuple3;
import scala.collection.JavaConversions; import scala.collection.JavaConversions;
import java.util.*;
import java.util.stream.Stream;
public class DedupRecordFactory { public class DedupRecordFactory {
public static final class DedupRecordReduceState { public static final class DedupRecordReduceState {
public final String dedupId; public final String dedupId;
public final ArrayList<String> aliases = new ArrayList<>(); public final ArrayList<String> aliases = new ArrayList<>();
public final HashSet<String> acceptanceDate = new HashSet<>(); public final HashSet<String> acceptanceDate = new HashSet<>();
public OafEntity entity; public OafEntity entity;
public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { public DedupRecordReduceState(String dedupId, String id, OafEntity entity) {
this.dedupId = dedupId; this.dedupId = dedupId;
this.entity = entity; this.entity = entity;
if (entity == null) { if (entity == null) {
aliases.add(id); aliases.add(id);
} else { } else {
if (Result.class.isAssignableFrom(entity.getClass())) { if (Result.class.isAssignableFrom(entity.getClass())) {
Result result = (Result) entity; Result result = (Result) entity;
if (result.getDateofacceptance() != null && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { if (result.getDateofacceptance() != null
acceptanceDate.add(result.getDateofacceptance().getValue()); && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
} acceptanceDate.add(result.getDateofacceptance().getValue());
} }
} }
} }
}
public String getDedupId() { public String getDedupId() {
return dedupId; return dedupId;
} }
} }
private static final int MAX_ACCEPTANCE_DATE = 20;
private DedupRecordFactory() { private static final int MAX_ACCEPTANCE_DATE = 20;
}
public static Dataset<OafEntity> createDedupRecord( private DedupRecordFactory() {
final SparkSession spark, }
final DataInfo dataInfo,
final String mergeRelsInputPath,
final String entitiesInputPath,
final Class<OafEntity> clazz) {
final long ts = System.currentTimeMillis(); public static Dataset<OafEntity> createDedupRecord(
final Encoder<OafEntity> beanEncoder = Encoders.bean(clazz); final SparkSession spark,
final Encoder<OafEntity> kryoEncoder = Encoders.kryo(clazz); final DataInfo dataInfo,
final String mergeRelsInputPath,
final String entitiesInputPath,
final Class<OafEntity> clazz) {
// <id, json_entity> final long ts = System.currentTimeMillis();
Dataset<Row> entities = spark final Encoder<OafEntity> beanEncoder = Encoders.bean(clazz);
.read() final Encoder<OafEntity> kryoEncoder = Encoders.kryo(clazz);
.schema(Encoders.bean(clazz).schema())
.json(entitiesInputPath)
.as(beanEncoder)
.map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) entity -> {
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), kryoEncoder))
.selectExpr("_1 AS id", "_2 AS kryoObject");
// <source, target>: source is the dedup_id, target is the id of the mergedIn // <id, json_entity>
Dataset<Row> mergeRels = spark Dataset<Row> entities = spark
.read() .read()
.load(mergeRelsInputPath) .schema(Encoders.bean(clazz).schema())
.where("relClass == 'merges'") .json(entitiesInputPath)
.selectExpr("source as dedupId", "target as id"); .as(beanEncoder)
.map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) entity -> {
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), kryoEncoder))
.selectExpr("_1 AS id", "_2 AS kryoObject");
return mergeRels // <source, target>: source is the dedup_id, target is the id of the mergedIn
.join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") Dataset<Row> mergeRels = spark
.select("dedupId", "id", "kryoObject") .read()
.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) .load(mergeRelsInputPath)
.map((MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(t._1(), t._2(), t._3()), Encoders.kryo(DedupRecordReduceState.class)) .where("relClass == 'merges'")
.groupByKey((MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING()) .selectExpr("source as dedupId", "target as id");
.reduceGroups(
(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
if (t1.entity == null) {
t2.aliases.addAll(t1.aliases);
return t2;
}
if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
t1.acceptanceDate.addAll(t2.acceptanceDate);
}
t1.aliases.addAll(t2.aliases);
t1.entity = reduceEntity(t1.entity, t2.entity);
return t1; return mergeRels
} .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
) .select("dedupId", "id", "kryoObject")
.flatMap .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> { .map(
String dedupId = t._1(); (MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(
DedupRecordReduceState agg = t._2(); t._1(), t._2(), t._3()),
Encoders.kryo(DedupRecordReduceState.class))
.groupByKey(
(MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING())
.reduceGroups(
(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
if (t1.entity == null) {
t2.aliases.addAll(t1.aliases);
return t2;
}
if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
t1.acceptanceDate.addAll(t2.acceptanceDate);
}
t1.aliases.addAll(t2.aliases);
t1.entity = reduceEntity(t1.entity, t2.entity);
if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { return t1;
return Collections.emptyIterator(); })
} .flatMap((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> {
String dedupId = t._1();
DedupRecordReduceState agg = t._2();
return Stream.concat(Stream.of(agg.getDedupId()), agg.aliases.stream()) if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) {
.map(id -> { return Collections.emptyIterator();
try { }
OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity);
res.setId(id);
res.setDataInfo(dataInfo);
res.setLastupdatetimestamp(ts);
return res;
} catch (Exception e) {
throw new RuntimeException(e);
}
}).iterator();
}, beanEncoder);
}
private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { return Stream
.concat(Stream.of(agg.getDedupId()), agg.aliases.stream())
.map(id -> {
try {
OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity);
res.setId(id);
res.setDataInfo(dataInfo);
res.setLastupdatetimestamp(ts);
return res;
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.iterator();
}, beanEncoder);
}
private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) {
if (duplicate == null) { if (duplicate == null) {
return entity; return entity;
} }
int compare = new IdentifierComparator<>()
.compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
int compare = new IdentifierComparator<>() if (compare > 0) {
.compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
if (compare > 0) {
OafEntity swap = duplicate; OafEntity swap = duplicate;
duplicate = entity; duplicate = entity;
entity = swap; entity = swap;
} }
entity.mergeFrom(duplicate); entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) { if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result re = (Result) entity; Result re = (Result) entity;
Result rd = (Result) duplicate; Result rd = (Result) duplicate;
List<List<Author>> authors = new ArrayList<>(); List<List<Author>> authors = new ArrayList<>();
if (re.getAuthor() != null) { if (re.getAuthor() != null) {
authors.add(re.getAuthor()); authors.add(re.getAuthor());
} }
if (rd.getAuthor() != null) { if (rd.getAuthor() != null) {
authors.add(rd.getAuthor()); authors.add(rd.getAuthor());
} }
re.setAuthor(AuthorMerger.merge(authors)); re.setAuthor(AuthorMerger.merge(authors));
} }
return entity; return entity;
} }
public static <T extends OafEntity> T entityMerger( public static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) { String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) {
T base = entities.next()._2(); T base = entities.next()._2();
while (entities.hasNext()) { while (entities.hasNext()) {
T duplicate = entities.next()._2(); T duplicate = entities.next()._2();
if (duplicate != null) if (duplicate != null)
base = (T) reduceEntity(base, duplicate); base = (T) reduceEntity(base, duplicate);
} }
base.setId(id); base.setId(id);
base.setDataInfo(dataInfo); base.setDataInfo(dataInfo);
base.setLastupdatetimestamp(ts); base.setLastupdatetimestamp(ts);
return base; return base;
} }
} }

View File

@ -242,13 +242,14 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
// this was a pivot in a previous graph but it has been merged into a new group with different // this was a pivot in a previous graph but it has been merged into a new group with different
// pivot // pivot
if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id) && !dedupId.equals(pivotDedupId)) { if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id)
&& !dedupId.equals(pivotDedupId)) {
// materialize the previous dedup record as a merge relation with the new one // materialize the previous dedup record as a merge relation with the new one
res.add(new Tuple3<>(dedupId, pivotDedupId, null)); res.add(new Tuple3<>(dedupId, pivotDedupId, null));
} }
// add merge relations // add merge relations
if (cut <=0 || r.<Integer>getAs("position") <= cut) { if (cut <= 0 || r.<Integer> getAs("position") <= cut) {
res.add(new Tuple3<>(id, pivotDedupId, pivot)); res.add(new Tuple3<>(id, pivotDedupId, pivot));
} }