optimization of the local test by removing unnecessary steps
This commit is contained in:
parent
46db6b02d3
commit
c6bd13f0c8
|
@ -59,7 +59,6 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
|
|||
.toURI())
|
||||
.toFile()
|
||||
.getAbsolutePath();
|
||||
|
||||
final String dedupConfPath = Paths
|
||||
.get(
|
||||
Objects
|
||||
|
@ -68,7 +67,6 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
|
|||
.toFile()
|
||||
.getAbsolutePath();
|
||||
|
||||
final static int MAX_ACCEPTANCE_DATE = 20;
|
||||
|
||||
private static SparkDeduper deduper;
|
||||
private static SparkModel model;
|
||||
|
@ -193,67 +191,9 @@ public class SparkDedupLocalTest extends DedupLocalTestUtils {
|
|||
.where("relClass == 'merges'")
|
||||
.selectExpr("source as dedupId", "target as id");
|
||||
|
||||
Dataset<OafEntity> dedupRecords = mergeRelsRow
|
||||
.join(kryoEntities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
|
||||
.select("dedupId", "id", "kryoObject")
|
||||
.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
|
||||
.groupByKey((MapFunction<Tuple3<String, String, OafEntity>, String>) Tuple3::_1, Encoders.STRING())
|
||||
.flatMapGroups(
|
||||
(FlatMapGroupsFunction<String, Tuple3<String, String, OafEntity>, OafEntity>) (dedupId, it) -> {
|
||||
if (!it.hasNext())
|
||||
return Collections.emptyIterator();
|
||||
|
||||
final ArrayList<OafEntity> cliques_ = new ArrayList<>();
|
||||
|
||||
final ArrayList<String> aliases = new ArrayList<>();
|
||||
|
||||
final HashSet<String> acceptanceDate = new HashSet<>();
|
||||
|
||||
while (it.hasNext()) {
|
||||
Tuple3<String, String, OafEntity> t = it.next();
|
||||
OafEntity entity = t._3();
|
||||
|
||||
if (entity == null) {
|
||||
aliases.add(t._2());
|
||||
} else {
|
||||
cliques_.add(entity);
|
||||
|
||||
if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { // max acceptance date
|
||||
if (Result.class.isAssignableFrom(entity.getClass())) {
|
||||
Result result = (Result) entity;
|
||||
if (result.getDateofacceptance() != null
|
||||
&& StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
|
||||
acceptanceDate.add(result.getDateofacceptance().getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques_.isEmpty()) {
|
||||
return Collections.emptyIterator();
|
||||
}
|
||||
|
||||
OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques_.iterator());
|
||||
// dedup records do not have date of transformation attribute
|
||||
mergedEntity.setDateoftransformation(null);
|
||||
|
||||
return Stream
|
||||
.concat(
|
||||
Stream
|
||||
.of(dedupId)
|
||||
.map(id -> DedupLocalTestUtils.createOafEntity(id, mergedEntity, before_dedupentity)),
|
||||
aliases
|
||||
.stream()
|
||||
.map(id -> DedupLocalTestUtils.createOafEntity(id, mergedEntity, before_dedupentity)))
|
||||
.iterator();
|
||||
|
||||
}, beanEncoder);
|
||||
|
||||
long dedupentity_time = System.currentTimeMillis() - before_dedupentity;
|
||||
|
||||
long dedupentity_number = dedupRecords.count();
|
||||
long dedupentity_number = mergeRelsRow.toJavaRDD().map(r -> r.getAs("dedupId")).distinct().count();
|
||||
|
||||
System.out.println("Number of simRels : " + simrels_number);
|
||||
System.out.println("Number of mergeRels : " + mergerels_number);
|
||||
|
|
Loading…
Reference in New Issue