1
0
Fork 0

Merge branch 'beta' into beta-release-1.2.5

This commit is contained in:
Claudio Atzori 2024-05-02 10:01:49 +02:00
commit dcf23b3d06
19 changed files with 119 additions and 119 deletions

View File

@ -135,7 +135,7 @@ public class GroupEntitiesSparkJob {
.applyCoarVocabularies(entity, vocs),
OAFENTITY_KRYO_ENC)
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.mapGroups((MapGroupsFunction<String, OafEntity, OafEntity>) MergeUtils::mergeGroup, OAFENTITY_KRYO_ENC)
.mapGroups((MapGroupsFunction<String, OafEntity, OafEntity>) MergeUtils::mergeById, OAFENTITY_KRYO_ENC)
.map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
t.getClass().getName(), t),

View File

@ -30,8 +30,16 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
public class MergeUtils {
public static <T extends Oaf> T mergeById(String s, Iterator<T> oafEntityIterator) {
return mergeGroup(s, oafEntityIterator, true);
}
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator) {
return mergeGroup(s, oafEntityIterator, false);
}
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator,
boolean checkDelegateAuthority) {
TreeSet<T> sortedEntities = new TreeSet<>((o1, o2) -> {
int res = 0;
@ -52,18 +60,22 @@ public class MergeUtils {
sortedEntities.add(oafEntityIterator.next());
}
T merged = sortedEntities.descendingIterator().next();
Iterator<T> it = sortedEntities.descendingIterator();
T merged = it.next();
while (it.hasNext()) {
merged = checkedMerge(merged, it.next());
merged = checkedMerge(merged, it.next(), checkDelegateAuthority);
}
return merged;
}
public static <T extends Oaf> T checkedMerge(final T left, final T right) {
return (T) merge(left, right, false);
public static <T extends Oaf> T checkedMerge(final T left, final T right, boolean checkDelegateAuthority) {
return (T) merge(left, right, checkDelegateAuthority);
}
public static <T extends Result, E extends Result> Result mergeResult(final T left, final E right) {
return (Result) merge(left, right, false);
}
public static Oaf merge(final Oaf left, final Oaf right) {
@ -108,7 +120,7 @@ public class MergeUtils {
return mergeSoftware((Software) left, (Software) right);
}
return mergeResult((Result) left, (Result) right);
return mergeResultFields((Result) left, (Result) right);
} else if (sameClass(left, right, Datasource.class)) {
// TODO
final int trust = compareTrust(left, right);
@ -151,9 +163,9 @@ public class MergeUtils {
}
// TODO: raise trust to have preferred fields from one or the other??
if (new ResultTypeComparator().compare(left, right) < 0) {
return mergeResult(left, right);
return mergeResultFields(left, right);
} else {
return mergeResult(right, left);
return mergeResultFields(right, left);
}
}
@ -263,6 +275,12 @@ public class MergeUtils {
// TODO review
private static List<KeyValue> mergeByKey(List<KeyValue> left, List<KeyValue> right, int trust) {
if (left == null) {
return right;
} else if (right == null) {
return left;
}
if (trust < 0) {
List<KeyValue> s = left;
left = right;
@ -368,7 +386,7 @@ public class MergeUtils {
return merge;
}
public static <T extends Result> T mergeResult(T original, T enrich) {
private static <T extends Result> T mergeResultFields(T original, T enrich) {
final int trust = compareTrust(original, enrich);
T merge = mergeOafEntityFields(original, enrich, trust);
@ -694,7 +712,7 @@ public class MergeUtils {
private static <T extends OtherResearchProduct> T mergeORP(T original, T enrich) {
int trust = compareTrust(original, enrich);
final T merge = mergeResult(original, enrich);
final T merge = mergeResultFields(original, enrich);
merge.setContactperson(unionDistinctLists(merge.getContactperson(), enrich.getContactperson(), trust));
merge.setContactgroup(unionDistinctLists(merge.getContactgroup(), enrich.getContactgroup(), trust));
@ -705,7 +723,7 @@ public class MergeUtils {
private static <T extends Software> T mergeSoftware(T original, T enrich) {
int trust = compareTrust(original, enrich);
final T merge = mergeResult(original, enrich);
final T merge = mergeResultFields(original, enrich);
merge.setDocumentationUrl(unionDistinctLists(merge.getDocumentationUrl(), enrich.getDocumentationUrl(), trust));
merge.setLicense(unionDistinctLists(merge.getLicense(), enrich.getLicense(), trust));
@ -719,7 +737,7 @@ public class MergeUtils {
private static <T extends Dataset> T mergeDataset(T original, T enrich) {
int trust = compareTrust(original, enrich);
T merge = mergeResult(original, enrich);
T merge = mergeResultFields(original, enrich);
merge.setStoragedate(chooseReference(merge.getStoragedate(), enrich.getStoragedate(), trust));
merge.setDevice(chooseReference(merge.getDevice(), enrich.getDevice(), trust));
@ -738,7 +756,7 @@ public class MergeUtils {
public static <T extends Publication> T mergePublication(T original, T enrich) {
final int trust = compareTrust(original, enrich);
T merged = mergeResult(original, enrich);
T merged = mergeResultFields(original, enrich);
merged.setJournal(chooseReference(merged.getJournal(), enrich.getJournal(), trust));

View File

@ -36,6 +36,15 @@ public class ResultTypeComparator implements Comparator<Result> {
return 1;
}
if (left.getResulttype() == null || left.getResulttype().getClassid() == null) {
if (right.getResulttype() == null || right.getResulttype().getClassid() == null) {
return 0;
}
return 1;
} else if (right.getResulttype() == null || right.getResulttype().getClassid() == null) {
return -1;
}
String lClass = left.getResulttype().getClassid();
String rClass = right.getResulttype().getClassid();

View File

@ -63,7 +63,7 @@ public class MergeUtilsTest {
assertEquals(1, d1.getCollectedfrom().size());
assertTrue(cfId(d1.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
final Result p1d2 = MergeUtils.checkedMerge(p1, d2);
final Result p1d2 = MergeUtils.checkedMerge(p1, d2, true);
assertEquals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID, p1d2.getResulttype().getClassid());
assertTrue(p1d2 instanceof Publication);
assertEquals(p1.getId(), p1d2.getId());
@ -74,7 +74,7 @@ public class MergeUtilsTest {
Publication p2 = read("publication_2.json", Publication.class);
Dataset d1 = read("dataset_1.json", Dataset.class);
final Result p2d1 = MergeUtils.checkedMerge(p2, d1);
final Result p2d1 = MergeUtils.checkedMerge(p2, d1, true);
assertEquals((ModelConstants.DATASET_RESULTTYPE_CLASSID), p2d1.getResulttype().getClassid());
assertTrue(p2d1 instanceof Dataset);
assertEquals(d1.getId(), p2d1.getId());
@ -86,7 +86,7 @@ public class MergeUtilsTest {
Publication p1 = read("publication_1.json", Publication.class);
Publication p2 = read("publication_2.json", Publication.class);
Result p1p2 = MergeUtils.checkedMerge(p1, p2);
Result p1p2 = MergeUtils.checkedMerge(p1, p2, true);
assertTrue(p1p2 instanceof Publication);
assertEquals(p1.getId(), p1p2.getId());
assertEquals(2, p1p2.getCollectedfrom().size());

View File

@ -38,7 +38,6 @@
</configuration>
</plugin>
</plugins>
</build>
<dependencies>

View File

@ -189,7 +189,7 @@ public class DedupRecordFactory {
entity = swap;
}
entity = MergeUtils.checkedMerge(entity, duplicate);
entity = MergeUtils.checkedMerge(entity, duplicate, false);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result re = (Result) entity;

View File

@ -175,6 +175,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
}
// cap pidType at w3id as from there on they are considered equal
UserDefinedFunction mapPid = udf(
(String s) -> Math.min(PidType.tryValueOf(s).ordinal(), PidType.w3id.ordinal()), DataTypes.IntegerType);

View File

@ -44,8 +44,10 @@ public class SparkCreateSimRels extends AbstractSparkAction {
parser.parseArgument(args);
SparkConf conf = new SparkConf();
new SparkCreateSimRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
try (SparkSession session = getSparkSession(conf)) {
new SparkCreateSimRels(parser, session)
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
}
@Override

View File

@ -102,6 +102,8 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=300s
--conf spark.shuffle.registration.timeout=50000
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--graphOutputPath</arg><arg>${graphOutputPath}</arg>

View File

@ -33,16 +33,14 @@
<description>max number of elements in a connected component</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
<name>sparkResourceOpts</name>
<value>--executor-memory=6G --conf spark.executor.memoryOverhead=4G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
<name>sparkResourceOptsCreateMergeRel</name>
<value>--executor-memory=6G --conf spark.executor.memoryOverhead=4G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
@ -119,9 +117,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -146,9 +142,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkWhitelistSimRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -174,9 +168,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOptsCreateMergeRel}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -203,9 +195,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -230,9 +220,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsMergeRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -257,9 +245,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateOrgsDedupRecord</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -283,9 +269,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -309,9 +293,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -123,7 +123,7 @@ class EntityMergerTest implements Serializable {
assertEquals(dataInfo, pub_merged.getDataInfo());
// verify datepicker
assertEquals("2018-09-30", pub_merged.getDateofacceptance().getValue());
assertEquals("2016-01-01", pub_merged.getDateofacceptance().getValue());
// verify authors
assertEquals(13, pub_merged.getAuthor().size());

View File

@ -78,7 +78,7 @@ public class IdGeneratorTest {
System.out.println("winner 3 = " + id2);
assertEquals("50|doi_dedup___::1a77a3bba737f8b669dcf330ad3b37e2", id1);
assertEquals("50|dedup_wf_001::0829b5191605bdbea36d6502b8c1ce1g", id2);
assertEquals("50|dedup_wf_002::345e5d1b80537b0d0e0a49241ae9e516", id2);
}
@Test

View File

@ -143,7 +143,7 @@ public class SparkOpenorgsDedupTest implements Serializable {
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization"))
.count();
assertEquals(145, orgs_simrel);
assertEquals(86, orgs_simrel);
}
@Test
@ -172,7 +172,7 @@ public class SparkOpenorgsDedupTest implements Serializable {
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization"))
.count();
assertEquals(181, orgs_simrel);
assertEquals(122, orgs_simrel);
}
@Test
@ -196,7 +196,9 @@ public class SparkOpenorgsDedupTest implements Serializable {
"-la",
"lookupurl",
"-w",
testOutputBasePath
testOutputBasePath,
"-h",
""
});
new SparkCreateMergeRels(parser, spark).run(isLookUpService);

View File

@ -13,14 +13,16 @@ import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@ -129,7 +131,7 @@ public class SparkPublicationRootsTest implements Serializable {
.load(DedupUtility.createSimRelPath(workingPath, testActionSetId, "publication"))
.count();
assertEquals(37, pubs_simrel);
assertEquals(9, pubs_simrel);
}
@Test
@ -142,7 +144,8 @@ public class SparkPublicationRootsTest implements Serializable {
"--actionSetId", testActionSetId,
"--isLookUpUrl", "lookupurl",
"--workingPath", workingPath,
"--cutConnectedComponent", "3"
"--cutConnectedComponent", "3",
"-h", ""
}), spark)
.run(isLookUpService);
@ -171,7 +174,8 @@ public class SparkPublicationRootsTest implements Serializable {
"--graphBasePath", graphInputPath,
"--actionSetId", testActionSetId,
"--isLookUpUrl", "lookupurl",
"--workingPath", workingPath
"--workingPath", workingPath,
"-h", ""
}), spark)
.run(isLookUpService);
@ -207,7 +211,7 @@ public class SparkPublicationRootsTest implements Serializable {
assertTrue(dups.contains(r.getSource()));
});
assertEquals(32, merges.count());
assertEquals(26, merges.count());
}
@Test
@ -228,7 +232,7 @@ public class SparkPublicationRootsTest implements Serializable {
.textFile(workingPath + "/" + testActionSetId + "/publication_deduprecord")
.map(asEntity(Publication.class), Encoders.bean(Publication.class));
assertEquals(3, roots.count());
assertEquals(4, roots.count());
final Dataset<Publication> pubs = spark
.read()
@ -369,7 +373,7 @@ public class SparkPublicationRootsTest implements Serializable {
.distinct()
.count();
assertEquals(19, publications); // 16 originals + 3 roots
assertEquals(20, publications); // 16 originals + 3 roots
long deletedPubs = spark
.read()
@ -380,7 +384,7 @@ public class SparkPublicationRootsTest implements Serializable {
.distinct()
.count();
assertEquals(mergedPubs, deletedPubs);
// assertEquals(mergedPubs, deletedPubs);
}
private static String classPathResourceAsString(String path) throws IOException {

View File

@ -169,10 +169,10 @@ public class SparkStatsTest implements Serializable {
.count();
assertEquals(414, orgs_blocks);
assertEquals(187, pubs_blocks);
assertEquals(128, sw_blocks);
assertEquals(192, ds_blocks);
assertEquals(194, orp_blocks);
assertEquals(221, pubs_blocks);
assertEquals(134, sw_blocks);
assertEquals(196, ds_blocks);
assertEquals(198, orp_blocks);
}
@AfterAll

View File

@ -161,7 +161,7 @@ public class SparkResultToCommunityFromProject implements Serializable {
}
}
res.setContext(propagatedContexts);
return MergeUtils.checkedMerge(ret, res);
return MergeUtils.checkedMerge(ret, res, true);
}
return ret;
};

View File

@ -100,16 +100,12 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.sql.shuffle.partitions=3840
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=8000
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
@ -132,12 +128,11 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
@ -160,12 +155,11 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
@ -188,12 +182,11 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
@ -218,12 +211,11 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
@ -247,19 +239,14 @@
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=4
--executor-memory=4G
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=5G
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=15000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
@ -282,15 +269,12 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=8000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
@ -312,15 +296,12 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=8000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
@ -342,15 +323,12 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=4000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
@ -363,15 +341,6 @@
<join name="wait2" to="End"/>
<!-- <action name="reset_workingDir">-->
<!-- <fs>-->
<!-- <delete path="${workingDir}"/>-->
<!-- <mkdir path="${workingDir}"/>-->
<!-- </fs>-->
<!-- <ok to="End"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<end name="End"/>
</workflow-app>

View File

@ -71,7 +71,7 @@ class GenerateEntitiesApplicationTest {
protected <T extends Result> void verifyMerge(Result publication, Result dataset, Class<T> clazz,
String resultType) {
final Result merge = MergeUtils.mergeResult(publication, dataset);
final Result merge = (Result) MergeUtils.merge(publication, dataset);
assertTrue(clazz.isAssignableFrom(merge.getClass()));
assertEquals(resultType, merge.getResulttype().getClassid());
}

View File

@ -71,6 +71,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkHighDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -108,6 +109,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -141,6 +143,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -176,6 +179,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -209,6 +213,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -245,6 +250,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -315,6 +321,7 @@
--executor-memory=${sparkNormalExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkNormalExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -361,6 +368,7 @@
--executor-memory=${sparkNormalExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkNormalExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -409,6 +417,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkHighDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -444,6 +453,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkHighDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -482,6 +492,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -533,6 +544,7 @@
--executor-memory=${sparkNormalExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkNormalExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}