enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
1 changed files with 18 additions and 0 deletions
Showing only changes of commit f057dcdf65 - Show all commits

View File

@ -3,8 +3,10 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
@ -59,6 +61,8 @@ public class CreateRelatedEntitiesJob_phase2 {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int MAX_EXTERNAL_ENTITIES = 50;
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
@ -190,6 +194,20 @@ public class CreateRelatedEntitiesJob_phase2 {
(MapFunction<String, E>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
Encoders.bean(entityClazz))
.filter("dataInfo.invisible == false")
.map((MapFunction<E, E>) e -> {
if (ModelSupport.isSubClass(entityClazz, Result.class)) {
Result r = (Result) e;
if (r.getExternalReference() != null) {
List<ExternalReference> refs = r
.getExternalReference()
.stream()
.limit(MAX_EXTERNAL_ENTITIES)
.collect(Collectors.toList());
r.setExternalReference(refs);
}
}
return e;
}, Encoders.bean(entityClazz))
.map(
(MapFunction<E, TypedRow>) value -> getTypedRow(
StringUtils.substringAfterLast(inputEntityPath, "/"), value),