mergin with branch beta
This commit is contained in:
commit
e9a20fc8f6
|
@ -286,6 +286,12 @@ public class GraphCleaningFunctions extends CleaningFunctions {
|
||||||
|
|
||||||
public static <T extends Oaf> T cleanup(T value, VocabularyGroup vocs) {
|
public static <T extends Oaf> T cleanup(T value, VocabularyGroup vocs) {
|
||||||
|
|
||||||
|
if (Objects.isNull(value.getDataInfo())) {
|
||||||
|
final DataInfo d = new DataInfo();
|
||||||
|
d.setDeletedbyinference(false);
|
||||||
|
value.setDataInfo(d);
|
||||||
|
}
|
||||||
|
|
||||||
if (value instanceof OafEntity) {
|
if (value instanceof OafEntity) {
|
||||||
|
|
||||||
OafEntity e = (OafEntity) value;
|
OafEntity e = (OafEntity) value;
|
||||||
|
@ -305,6 +311,10 @@ public class GraphCleaningFunctions extends CleaningFunctions {
|
||||||
} else if (value instanceof Result) {
|
} else if (value instanceof Result) {
|
||||||
Result r = (Result) value;
|
Result r = (Result) value;
|
||||||
|
|
||||||
|
if (Objects.isNull(r.getContext())) {
|
||||||
|
r.setContext(new ArrayList<>());
|
||||||
|
}
|
||||||
|
|
||||||
if (Objects.nonNull(r.getFulltext())
|
if (Objects.nonNull(r.getFulltext())
|
||||||
&& (ModelConstants.SOFTWARE_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()) ||
|
&& (ModelConstants.SOFTWARE_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()) ||
|
||||||
ModelConstants.DATASET_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()))) {
|
ModelConstants.DATASET_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()))) {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import eu.dnetlib.dhp.schema.common.EntityType;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
|
|
||||||
public class SparkBulkTagJob {
|
public class SparkBulkTagJob {
|
||||||
|
|
||||||
|
@ -165,10 +166,12 @@ public class SparkBulkTagJob {
|
||||||
// TODO remove this hack as soon as the values fixed by this method will be provided as NON null
|
// TODO remove this hack as soon as the values fixed by this method will be provided as NON null
|
||||||
private static <R extends Result> MapFunction<R, R> patchResult() {
|
private static <R extends Result> MapFunction<R, R> patchResult() {
|
||||||
return r -> {
|
return r -> {
|
||||||
if (r.getDataInfo().getDeletedbyinference() == null) {
|
if (Objects.isNull(r.getDataInfo())) {
|
||||||
|
r.setDataInfo(OafMapperUtils.dataInfo(false, "", false, false, OafMapperUtils.unknown("", ""), ""));
|
||||||
|
} else if (r.getDataInfo().getDeletedbyinference() == null) {
|
||||||
r.getDataInfo().setDeletedbyinference(false);
|
r.getDataInfo().setDeletedbyinference(false);
|
||||||
}
|
}
|
||||||
if (r.getContext() == null) {
|
if (Objects.isNull(r.getContext())) {
|
||||||
r.setContext(new ArrayList<>());
|
r.setContext(new ArrayList<>());
|
||||||
}
|
}
|
||||||
return r;
|
return r;
|
||||||
|
|
|
@ -5,10 +5,7 @@ import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
@ -88,14 +85,33 @@ public class PrepareResultCountrySet {
|
||||||
// selects all the results non deleted by inference and non invisible
|
// selects all the results non deleted by inference and non invisible
|
||||||
Dataset<R> result = readPath(spark, inputPath, resultClazz)
|
Dataset<R> result = readPath(spark, inputPath, resultClazz)
|
||||||
.filter(
|
.filter(
|
||||||
(FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() &&
|
(FilterFunction<R>) r -> Optional
|
||||||
!r.getDataInfo().getInvisible());
|
.ofNullable(r.getDataInfo())
|
||||||
|
.map(dataInfo -> !dataInfo.getDeletedbyinference() && !dataInfo.getInvisible())
|
||||||
|
.orElse(true));
|
||||||
|
|
||||||
// of the results collects the distinct keys for collected from (at the level of the result) and hosted by
|
// of the results collects the distinct keys for collected from (at the level of the result) and hosted by
|
||||||
// and produces pairs resultId, key for each distinct key associated to the result
|
// and produces pairs resultId, key for each distinct key associated to the result
|
||||||
result.flatMap((FlatMapFunction<R, EntityEntityRel>) r -> {
|
result.flatMap((FlatMapFunction<R, EntityEntityRel>) r -> {
|
||||||
Set<String> cfhb = r.getCollectedfrom().stream().map(cf -> cf.getKey()).collect(Collectors.toSet());
|
Set<String> cfhb = Optional
|
||||||
cfhb.addAll(r.getInstance().stream().map(i -> i.getHostedby().getKey()).collect(Collectors.toSet()));
|
.ofNullable(r.getCollectedfrom())
|
||||||
|
.map(cf -> cf.stream().map(KeyValue::getKey).collect(Collectors.toSet()))
|
||||||
|
.orElse(new HashSet<>());
|
||||||
|
cfhb
|
||||||
|
.addAll(
|
||||||
|
Optional
|
||||||
|
.ofNullable(r.getInstance())
|
||||||
|
.map(
|
||||||
|
i -> i
|
||||||
|
.stream()
|
||||||
|
.map(
|
||||||
|
ii -> Optional
|
||||||
|
.ofNullable(ii.getHostedby())
|
||||||
|
.map(KeyValue::getKey)
|
||||||
|
.orElse(null))
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.collect(Collectors.toSet()))
|
||||||
|
.orElse(new HashSet<>()));
|
||||||
return cfhb
|
return cfhb
|
||||||
.stream()
|
.stream()
|
||||||
.map(value -> EntityEntityRel.newInstance(r.getId(), value))
|
.map(value -> EntityEntityRel.newInstance(r.getId(), value))
|
||||||
|
|
|
@ -1,4 +1,12 @@
|
||||||
<configuration>
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>oozie.use.system.libpath</name>
|
<name>oozie.use.system.libpath</name>
|
||||||
<value>true</value>
|
<value>true</value>
|
||||||
|
@ -7,4 +15,28 @@
|
||||||
<name>oozie.action.sharelib.for.spark</name>
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
<value>spark2</value>
|
<value>spark2</value>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_metastore_uris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorNumber</name>
|
||||||
|
<value>4</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<value>/user/spark/spark2ApplicationHistory</value>
|
||||||
|
</property>
|
||||||
</configuration>
|
</configuration>
|
2
pom.xml
2
pom.xml
|
@ -888,7 +888,7 @@
|
||||||
<mockito-core.version>3.3.3</mockito-core.version>
|
<mockito-core.version>3.3.3</mockito-core.version>
|
||||||
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
||||||
<vtd.version>[2.12,3.0)</vtd.version>
|
<vtd.version>[2.12,3.0)</vtd.version>
|
||||||
<dhp-schemas.version>[3.17.1]</dhp-schemas.version>
|
<dhp-schemas.version>[3.17.2]</dhp-schemas.version>
|
||||||
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
||||||
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
||||||
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
||||||
|
|
Loading…
Reference in New Issue