1
0
Fork 0

fixed property mapping creating the RelatedEntity transient objects. spark cores & memory adjustments. Code formatting

This commit is contained in:
Claudio Atzori 2024-05-07 16:25:17 +02:00
parent 711048ceed
commit b4e3389432
3 changed files with 54 additions and 37 deletions

View File

@ -153,10 +153,15 @@ public class CreateRelatedEntitiesJob_phase1 {
result result
.getTitle() .getTitle()
.stream() .stream()
.filter(t -> StringUtils.isNotBlank(t.getValue()))
.findFirst() .findFirst()
.map(StructuredProperty::getValue)
.ifPresent( .ifPresent(
title -> re.getTitle().setValue(StringUtils.left(title, ModelHardLimits.MAX_TITLE_LENGTH))); title -> {
re.setTitle(title);
re
.getTitle()
.setValue(StringUtils.left(title.getValue(), ModelHardLimits.MAX_TITLE_LENGTH));
});
} }
if (Objects.nonNull(result.getDescription()) && !result.getDescription().isEmpty()) { if (Objects.nonNull(result.getDescription()) && !result.getDescription().isEmpty()) {
result result

View File

@ -1,14 +1,15 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.ObjectMapper; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import com.google.common.base.Joiner; import static org.apache.spark.sql.functions.col;
import com.google.common.base.Splitter;
import com.google.common.collect.Sets; import java.util.HashSet;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import java.util.Optional;
import eu.dnetlib.dhp.common.HdfsSupport; import java.util.Set;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.Relation; import java.util.stream.Stream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
@ -20,12 +21,15 @@ import org.apache.spark.sql.functions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.HashSet; import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Optional; import com.google.common.base.Joiner;
import java.util.Set; import com.google.common.base.Splitter;
import com.google.common.collect.Sets;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import static org.apache.spark.sql.functions.col; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
/** /**
* PrepareRelationsJob prunes the relationships: only consider relationships that are not virtually deleted * PrepareRelationsJob prunes the relationships: only consider relationships that are not virtually deleted
@ -119,27 +123,33 @@ public class PrepareRelationsJob {
Set<String> relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) { Set<String> relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) {
WindowSpec source_w = Window WindowSpec source_w = Window
.partitionBy("source", "subRelType") .partitionBy("source", "subRelType")
.orderBy(col("target").desc_nulls_last()); .orderBy(col("target").desc_nulls_last());
WindowSpec target_w = Window WindowSpec target_w = Window
.partitionBy("target", "subRelType") .partitionBy("target", "subRelType")
.orderBy(col("source").desc_nulls_last()); .orderBy(col("source").desc_nulls_last());
spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputRelationsPath) spark
.where("source NOT LIKE 'unresolved%' AND target NOT LIKE 'unresolved%'") .read()
.where("datainfo.deletedbyinference != true") .schema(Encoders.bean(Relation.class).schema())
.where(relationFilter.isEmpty() ? "" : "lower(relClass) NOT IN ("+ Joiner.on(',').join(relationFilter) +")") .json(inputRelationsPath)
.withColumn("source_w_pos", functions.row_number().over(source_w)) .where("source NOT LIKE 'unresolved%' AND target NOT LIKE 'unresolved%'")
.where("source_w_pos < " + sourceMaxRelations ) .where("datainfo.deletedbyinference != true")
.drop("source_w_pos") .where(
.withColumn("target_w_pos", functions.row_number().over(target_w)) relationFilter.isEmpty() ? ""
.where("target_w_pos < " + targetMaxRelations) : "lower(relClass) NOT IN ("
.drop( "target_w_pos") + relationFilter.stream().map(s -> "'" + s + "'").collect(Collectors.joining(",")) + ")")
.coalesce(relPartitions) .withColumn("source_w_pos", functions.row_number().over(source_w))
.write() .where("source_w_pos < " + sourceMaxRelations)
.mode(SaveMode.Overwrite) .drop("source_w_pos")
.parquet(outputPath); .withColumn("target_w_pos", functions.row_number().over(target_w))
.where("target_w_pos < " + targetMaxRelations)
.drop("target_w_pos")
.coalesce(relPartitions)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
} }
private static void removeOutputDir(SparkSession spark, String path) { private static void removeOutputDir(SparkSession spark, String path) {

View File

@ -144,21 +144,23 @@
<class>eu.dnetlib.dhp.oa.provision.PrepareRelationsJob</class> <class>eu.dnetlib.dhp.oa.provision.PrepareRelationsJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCoresForJoining} --executor-cores=4
--executor-memory=${sparkExecutorMemoryForJoining} --executor-memory=6G
--driver-memory=${sparkDriverMemoryForJoining} --driver-memory=${sparkDriverMemoryForJoining}
--conf spark.executor.memoryOverhead=6G
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg> <arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg> <arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--sourceMaxRelations</arg><arg>${sourceMaxRelations}</arg> <arg>--sourceMaxRelations</arg><arg>${sourceMaxRelations}</arg>
<arg>--targetMaxRelations</arg><arg>${targetMaxRelations}</arg> <arg>--targetMaxRelations</arg><arg>${targetMaxRelations}</arg>
<arg>--relationFilter</arg><arg>${relationFilter}</arg> <arg>--relationFilter</arg><arg>${relationFilter}</arg>
<arg>--relPartitions</arg><arg>5000</arg> <arg>--relPartitions</arg><arg>15000</arg>
</spark> </spark>
<ok to="fork_join_related_entities"/> <ok to="fork_join_related_entities"/>
<error to="Kill"/> <error to="Kill"/>