forked from D-Net/dnet-hadoop
Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop
This commit is contained in:
commit
2c4ed9a043
|
@ -53,7 +53,7 @@ public class EntityMergerTest implements Serializable {
|
|||
Software merged = DedupRecordFactory
|
||||
.entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class);
|
||||
|
||||
System.out.println(merged.getBestaccessright().getClassid());
|
||||
assertEquals(merged.getBestaccessright().getClassid(), "OPEN SOURCE");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -16,6 +16,11 @@
|
|||
<name>postgresPassword</name>
|
||||
<description>the password postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dbSchema</name>
|
||||
<value>beta</value>
|
||||
<description>the database schema according to the D-Net infrastructure (beta or production)</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<description>the address of the lookUp service</description>
|
||||
|
@ -93,6 +98,7 @@
|
|||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||
</java>
|
||||
<ok to="ImportDB_claims"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -109,6 +115,7 @@
|
|||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||
<arg>--action</arg><arg>claims</arg>
|
||||
</java>
|
||||
<ok to="End"/>
|
||||
|
|
|
@ -9,6 +9,7 @@ import java.util.Optional;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
|
@ -115,11 +116,21 @@ public class CreateRelatedEntitiesJob_phase1 {
|
|||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
|
||||
.cache();
|
||||
|
||||
Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz)
|
||||
final String relatedEntityPath = outputPath + "_relatedEntity";
|
||||
readPathEntity(spark, inputEntityPath, clazz)
|
||||
.filter("dataInfo.invisible == false")
|
||||
.map(
|
||||
(MapFunction<E, RelatedEntity>) value -> asRelatedEntity(value, clazz),
|
||||
Encoders.kryo(RelatedEntity.class))
|
||||
.repartition(5000)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.parquet(relatedEntityPath);
|
||||
|
||||
Dataset<Tuple2<String, RelatedEntity>> entities = spark
|
||||
.read()
|
||||
.load(relatedEntityPath)
|
||||
.as(Encoders.kryo(RelatedEntity.class))
|
||||
.map(
|
||||
(MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), e),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
|
||||
|
@ -165,13 +176,21 @@ public class CreateRelatedEntitiesJob_phase1 {
|
|||
Result result = (Result) entity;
|
||||
|
||||
if (result.getTitle() != null && !result.getTitle().isEmpty()) {
|
||||
re.setTitle(result.getTitle().stream().findFirst().get());
|
||||
final StructuredProperty title = result.getTitle().stream().findFirst().get();
|
||||
title.setValue(StringUtils.left(title.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
|
||||
re.setTitle(title);
|
||||
}
|
||||
|
||||
re.setDateofacceptance(getValue(result.getDateofacceptance()));
|
||||
re.setPublisher(getValue(result.getPublisher()));
|
||||
re.setResulttype(result.getResulttype());
|
||||
re.setInstances(result.getInstance());
|
||||
re
|
||||
.setInstances(
|
||||
result
|
||||
.getInstance()
|
||||
.stream()
|
||||
.limit(ProvisionConstants.MAX_INSTANCES)
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
// TODO still to be mapped
|
||||
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
|
||||
|
|
|
@ -61,12 +61,6 @@ public class CreateRelatedEntitiesJob_phase2 {
|
|||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static final int MAX_EXTERNAL_ENTITIES = 50;
|
||||
private static final int MAX_AUTHORS = 200;
|
||||
private static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
|
||||
private static final int MAX_TITLE_LENGTH = 5000;
|
||||
private static final int MAX_ABSTRACT_LENGTH = 100000;
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
|
@ -246,15 +240,15 @@ public class CreateRelatedEntitiesJob_phase2 {
|
|||
List<ExternalReference> refs = r
|
||||
.getExternalReference()
|
||||
.stream()
|
||||
.limit(MAX_EXTERNAL_ENTITIES)
|
||||
.limit(ProvisionConstants.MAX_EXTERNAL_ENTITIES)
|
||||
.collect(Collectors.toList());
|
||||
r.setExternalReference(refs);
|
||||
}
|
||||
if (r.getAuthor() != null) {
|
||||
List<Author> authors = Lists.newArrayList();
|
||||
for (Author a : r.getAuthor()) {
|
||||
a.setFullname(StringUtils.left(a.getFullname(), MAX_AUTHOR_FULLNAME_LENGTH));
|
||||
if (authors.size() < MAX_AUTHORS || hasORCID(a)) {
|
||||
a.setFullname(StringUtils.left(a.getFullname(), ProvisionConstants.MAX_AUTHOR_FULLNAME_LENGTH));
|
||||
if (authors.size() < ProvisionConstants.MAX_AUTHORS || hasORCID(a)) {
|
||||
authors.add(a);
|
||||
}
|
||||
}
|
||||
|
@ -266,7 +260,7 @@ public class CreateRelatedEntitiesJob_phase2 {
|
|||
.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.map(d -> {
|
||||
d.setValue(StringUtils.left(d.getValue(), MAX_ABSTRACT_LENGTH));
|
||||
d.setValue(StringUtils.left(d.getValue(), ProvisionConstants.MAX_ABSTRACT_LENGTH));
|
||||
return d;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
@ -278,9 +272,10 @@ public class CreateRelatedEntitiesJob_phase2 {
|
|||
.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.map(t -> {
|
||||
t.setValue(StringUtils.left(t.getValue(), MAX_TITLE_LENGTH));
|
||||
t.setValue(StringUtils.left(t.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
|
||||
return t;
|
||||
})
|
||||
.limit(ProvisionConstants.MAX_TITLES)
|
||||
.collect(Collectors.toList());
|
||||
r.setTitle(titles);
|
||||
}
|
||||
|
|
|
@ -100,11 +100,17 @@ public class PrepareRelationsJob {
|
|||
.orElse(new HashSet<>());
|
||||
log.info("relationFilter: {}", relationFilter);
|
||||
|
||||
int maxRelations = Optional
|
||||
.ofNullable(parser.get("maxRelations"))
|
||||
int sourceMaxRelations = Optional
|
||||
.ofNullable(parser.get("sourceMaxRelations"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(MAX_RELS);
|
||||
log.info("maxRelations: {}", maxRelations);
|
||||
log.info("sourceMaxRelations: {}", sourceMaxRelations);
|
||||
|
||||
int targetMaxRelations = Optional
|
||||
.ofNullable(parser.get("targetMaxRelations"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(MAX_RELS);
|
||||
log.info("targetMaxRelations: {}", targetMaxRelations);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
|
@ -116,7 +122,8 @@ public class PrepareRelationsJob {
|
|||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
prepareRelationsRDD(
|
||||
spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions);
|
||||
spark, inputRelationsPath, outputPath, relationFilter, sourceMaxRelations, targetMaxRelations,
|
||||
relPartitions);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -129,31 +136,40 @@ public class PrepareRelationsJob {
|
|||
* @param inputRelationsPath source path for the graph relations
|
||||
* @param outputPath output path for the processed relations
|
||||
* @param relationFilter set of relation filters applied to the `relClass` field
|
||||
* @param maxRelations maximum number of allowed outgoing edges
|
||||
* @param sourceMaxRelations maximum number of allowed outgoing edges grouping by relation.source
|
||||
* @param targetMaxRelations maximum number of allowed outgoing edges grouping by relation.target
|
||||
* @param relPartitions number of partitions for the output RDD
|
||||
*/
|
||||
private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath,
|
||||
Set<String> relationFilter, int maxRelations, int relPartitions) {
|
||||
Set<String> relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) {
|
||||
|
||||
// group by SOURCE and apply limit
|
||||
RDD<Relation> bySource = readPathRelationRDD(spark, inputRelationsPath)
|
||||
JavaRDD<Relation> rels = readPathRelationRDD(spark, inputRelationsPath)
|
||||
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
|
||||
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
|
||||
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r))
|
||||
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false);
|
||||
|
||||
JavaRDD<Relation> pruned = pruneRels(
|
||||
pruneRels(
|
||||
rels,
|
||||
sourceMaxRelations, relPartitions, (Function<Relation, String>) r -> r.getSource()),
|
||||
targetMaxRelations, relPartitions, (Function<Relation, String>) r -> r.getTarget());
|
||||
spark
|
||||
.createDataset(pruned.rdd(), Encoders.bean(Relation.class))
|
||||
.repartition(relPartitions)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.parquet(outputPath);
|
||||
}
|
||||
|
||||
private static JavaRDD<Relation> pruneRels(JavaRDD<Relation> rels, int maxRelations,
|
||||
int relPartitions, Function<Relation, String> idFn) {
|
||||
return rels
|
||||
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r))
|
||||
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
|
||||
.groupBy(Tuple2::_1)
|
||||
.map(Tuple2::_2)
|
||||
.map(t -> Iterables.limit(t, maxRelations))
|
||||
.flatMap(Iterable::iterator)
|
||||
.map(Tuple2::_2)
|
||||
.rdd();
|
||||
|
||||
spark
|
||||
.createDataset(bySource, Encoders.bean(Relation.class))
|
||||
.repartition(relPartitions)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.parquet(outputPath);
|
||||
.map(Tuple2::_2);
|
||||
}
|
||||
|
||||
// experimental
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.provision;
|
||||
|
||||
public class ProvisionConstants {
|
||||
|
||||
public static final int MAX_EXTERNAL_ENTITIES = 50;
|
||||
public static final int MAX_AUTHORS = 200;
|
||||
public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
|
||||
public static final int MAX_TITLE_LENGTH = 5000;
|
||||
public static final int MAX_TITLES = 10;
|
||||
public static final int MAX_ABSTRACT_LENGTH = 100000;
|
||||
public static final int MAX_INSTANCES = 10;
|
||||
|
||||
}
|
|
@ -16,18 +16,18 @@ public class SortableRelationKey implements Comparable<SortableRelationKey>, Ser
|
|||
private static final Map<String, Integer> weights = Maps.newHashMap();
|
||||
|
||||
static {
|
||||
weights.put("outcome", 0);
|
||||
weights.put("supplement", 1);
|
||||
weights.put("review", 2);
|
||||
weights.put("citation", 3);
|
||||
weights.put("affiliation", 4);
|
||||
weights.put("relationship", 5);
|
||||
weights.put("publicationDataset", 6);
|
||||
weights.put("similarity", 7);
|
||||
weights.put("participation", 0);
|
||||
|
||||
weights.put("provision", 8);
|
||||
weights.put("participation", 9);
|
||||
weights.put("dedup", 10);
|
||||
weights.put("outcome", 1);
|
||||
weights.put("affiliation", 2);
|
||||
weights.put("dedup", 3);
|
||||
weights.put("publicationDataset", 4);
|
||||
weights.put("citation", 5);
|
||||
weights.put("supplement", 6);
|
||||
weights.put("review", 7);
|
||||
weights.put("relationship", 8);
|
||||
weights.put("provision", 9);
|
||||
weights.put("similarity", 10);
|
||||
}
|
||||
|
||||
private static final long serialVersionUID = 3232323;
|
||||
|
|
|
@ -30,9 +30,16 @@
|
|||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "mr",
|
||||
"paramLongName": "maxRelations",
|
||||
"paramDescription": "maximum number of relations allowed for a each entity",
|
||||
"paramName": "smr",
|
||||
"paramLongName": "sourceMaxRelations",
|
||||
"paramDescription": "maximum number of relations allowed for a each entity grouping by source",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "tmr",
|
||||
"paramLongName": "targetMaxRelations",
|
||||
"paramDescription": "maximum number of relations allowed for a each entity grouping by target",
|
||||
"paramRequired": false
|
||||
}
|
||||
|
||||
]
|
||||
|
|
|
@ -18,8 +18,12 @@
|
|||
<description>filter applied reading relations (by relClass)</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>maxRelations</name>
|
||||
<description>maximum number of relations allowed for a each entity</description>
|
||||
<name>sourceMaxRelations</name>
|
||||
<description>maximum number of relations allowed for a each entity grouping by source</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>targetMaxRelations</name>
|
||||
<description>maximum number of relations allowed for a each entity grouping by target</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>otherDsTypeId</name>
|
||||
|
@ -133,7 +137,8 @@
|
|||
</spark-opts>
|
||||
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
|
||||
<arg>--maxRelations</arg><arg>${maxRelations}</arg>
|
||||
<arg>--sourceMaxRelations</arg><arg>${sourceMaxRelations}</arg>
|
||||
<arg>--targetMaxRelations</arg><arg>${targetMaxRelations}</arg>
|
||||
<arg>--relationFilter</arg><arg>${relationFilter}</arg>
|
||||
<arg>--relPartitions</arg><arg>5000</arg>
|
||||
</spark>
|
||||
|
@ -166,7 +171,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
--conf spark.sql.shuffle.partitions=15000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
|
||||
|
@ -193,7 +198,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.sql.shuffle.partitions=15000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
|
||||
|
@ -220,7 +225,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.sql.shuffle.partitions=10000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
|
||||
|
@ -247,7 +252,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.sql.shuffle.partitions=5000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
|
||||
|
@ -274,7 +279,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.sql.shuffle.partitions=5000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
|
||||
|
@ -301,7 +306,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.sql.shuffle.partitions=5000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
|
||||
|
@ -328,7 +333,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.sql.shuffle.partitions=5000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
|
||||
|
@ -367,7 +372,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=15360
|
||||
--conf spark.sql.shuffle.partitions=15000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
|
||||
|
@ -395,7 +400,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
--conf spark.sql.shuffle.partitions=10000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
|
||||
|
@ -423,7 +428,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
--conf spark.sql.shuffle.partitions=10000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
|
||||
|
@ -451,7 +456,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.sql.shuffle.partitions=5000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
|
||||
|
@ -479,7 +484,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
--conf spark.sql.shuffle.partitions=8000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
|
||||
|
@ -507,7 +512,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
--conf spark.sql.shuffle.partitions=10000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
|
||||
|
@ -535,7 +540,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.sql.shuffle.partitions=5000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
|
||||
|
@ -607,5 +612,4 @@
|
|||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
</workflow-app>
|
Loading…
Reference in New Issue