Merge branch 'beta' into rest-collector-plugin-with-retry

This commit is contained in:
Claudio Atzori 2024-05-10 09:02:21 +02:00
commit f7d56e2ef2
44 changed files with 562 additions and 3273 deletions

View File

@ -63,11 +63,13 @@
<dependencies>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-pace-core</artifactId>
<version>${project.version}</version>
<groupId>edu.cmu</groupId>
<artifactId>secondstring</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>

View File

@ -874,9 +874,11 @@ public class MergeUtils {
if (toEnrichInstances == null) {
return enrichmentResult;
}
if (enrichmentInstances == null) {
return enrichmentResult;
if (enrichmentInstances == null || enrichmentInstances.isEmpty()) {
return toEnrichInstances;
}
Map<String, Instance> ri = toInstanceMap(enrichmentInstances);
toEnrichInstances.forEach(i -> {

View File

@ -0,0 +1,101 @@
package eu.dnetlib.pace.common;
import java.nio.charset.StandardCharsets;
import java.text.Normalizer;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.ibm.icu.text.Transliterator;
/**
* Set of common functions for the framework
*
* @author claudio
*/
public class PaceCommonUtils {
// transliterator
protected static Transliterator transliterator = Transliterator.getInstance("Any-Eng");
protected static final String aliases_from = "⁰¹²³⁴⁵⁶⁷⁸⁹⁺⁻⁼⁽⁾ⁿ₀₁₂₃₄₅₆₇₈₉₊₋₌₍₎àáâäæãåāèéêëēėęəîïíīįìôöòóœøōõûüùúūßśšłžźżçćčñń";
protected static final String aliases_to = "0123456789+-=()n0123456789+-=()aaaaaaaaeeeeeeeeiiiiiioooooooouuuuussslzzzcccnn";
protected static Pattern hexUnicodePattern = Pattern.compile("\\\\u(\\p{XDigit}{4})");
protected static String fixAliases(final String s) {
final StringBuilder sb = new StringBuilder();
s.chars().forEach(ch -> {
final int i = StringUtils.indexOf(aliases_from, ch);
sb.append(i >= 0 ? aliases_to.charAt(i) : (char) ch);
});
return sb.toString();
}
protected static String transliterate(final String s) {
try {
return transliterator.transliterate(s);
} catch (Exception e) {
return s;
}
}
public static String normalize(final String s) {
return fixAliases(transliterate(nfd(unicodeNormalization(s))))
.toLowerCase()
// do not compact the regexes in a single expression, would cause StackOverflowError in case of large input
// strings
.replaceAll("[^ \\w]+", "")
.replaceAll("(\\p{InCombiningDiacriticalMarks})+", "")
.replaceAll("(\\p{Punct})+", " ")
.replaceAll("(\\d)+", " ")
.replaceAll("(\\n)+", " ")
.trim();
}
public static String nfd(final String s) {
return Normalizer.normalize(s, Normalizer.Form.NFD);
}
public static String unicodeNormalization(final String s) {
Matcher m = hexUnicodePattern.matcher(s);
StringBuffer buf = new StringBuffer(s.length());
while (m.find()) {
String ch = String.valueOf((char) Integer.parseInt(m.group(1), 16));
m.appendReplacement(buf, Matcher.quoteReplacement(ch));
}
m.appendTail(buf);
return buf.toString();
}
public static Set<String> loadFromClasspath(final String classpath) {
Transliterator transliterator = Transliterator.getInstance("Any-Eng");
final Set<String> h = Sets.newHashSet();
try {
for (final String s : IOUtils
.readLines(PaceCommonUtils.class.getResourceAsStream(classpath), StandardCharsets.UTF_8)) {
h.add(fixAliases(transliterator.transliterate(s))); // transliteration of the stopwords
}
} catch (final Throwable e) {
return Sets.newHashSet();
}
return h;
}
protected static Iterable<String> tokens(final String s, final int maxTokens) {
return Iterables.limit(Splitter.on(" ").omitEmptyStrings().trimResults().split(s), maxTokens);
}
}

View File

@ -12,7 +12,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import eu.dnetlib.pace.common.AbstractPaceFunctions;
import eu.dnetlib.pace.common.PaceCommonUtils;
import eu.dnetlib.pace.util.Capitalise;
import eu.dnetlib.pace.util.DotAbbreviations;
@ -86,7 +86,7 @@ public class Person {
private List<String> splitTerms(final String s) {
if (particles == null) {
particles = AbstractPaceFunctions.loadFromClasspath("/eu/dnetlib/pace/config/name_particles.txt");
particles = PaceCommonUtils.loadFromClasspath("/eu/dnetlib/pace/config/name_particles.txt");
}
final List<String> list = Lists.newArrayList();

View File

@ -15,4 +15,4 @@ public class Capitalise implements Function<String, String> {
public String apply(final String s) {
return WordUtils.capitalize(s.toLowerCase(), DELIM);
}
};
}

View File

@ -8,4 +8,4 @@ public class DotAbbreviations implements Function<String, String> {
public String apply(String s) {
return s.length() == 1 ? s + "." : s;
}
};
}

View File

@ -49,6 +49,12 @@
</build>
<dependencies>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>edu.cmu</groupId>
<artifactId>secondstring</artifactId>

View File

@ -1,32 +1,26 @@
package eu.dnetlib.pace.common;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.ibm.icu.text.Transliterator;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.text.Normalizer;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.ibm.icu.text.Transliterator;
import eu.dnetlib.pace.clustering.NGramUtils;
/**
* Set of common functions for the framework
*
* @author claudio
*/
public class AbstractPaceFunctions {
public class AbstractPaceFunctions extends PaceCommonUtils {
// city map to be used when translating the city names into codes
private static Map<String, String> cityMap = AbstractPaceFunctions
@ -41,9 +35,6 @@ public class AbstractPaceFunctions {
protected static Set<String> stopwords_it = loadFromClasspath("/eu/dnetlib/pace/config/stopwords_it.txt");
protected static Set<String> stopwords_pt = loadFromClasspath("/eu/dnetlib/pace/config/stopwords_pt.txt");
// transliterator
protected static Transliterator transliterator = Transliterator.getInstance("Any-Eng");
// blacklist of ngrams: to avoid generic keys
protected static Set<String> ngramBlacklist = loadFromClasspath("/eu/dnetlib/pace/config/ngram_blacklist.txt");
@ -51,8 +42,6 @@ public class AbstractPaceFunctions {
public static final Pattern HTML_REGEX = Pattern.compile("<[^>]*>");
private static final String alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 ";
private static final String aliases_from = "⁰¹²³⁴⁵⁶⁷⁸⁹⁺⁻⁼⁽⁾ⁿ₀₁₂₃₄₅₆₇₈₉₊₋₌₍₎àáâäæãåāèéêëēėęəîïíīįìôöòóœøōõûüùúūßśšłžźżçćčñń";
private static final String aliases_to = "0123456789+-=()n0123456789+-=()aaaaaaaaeeeeeeeeiiiiiioooooooouuuuussslzzzcccnn";
// doi prefix for normalization
public static final Pattern DOI_PREFIX = Pattern.compile("(https?:\\/\\/dx\\.doi\\.org\\/)|(doi:)");
@ -129,25 +118,6 @@ public class AbstractPaceFunctions {
return numberPattern.matcher(strNum).matches();
}
protected static String fixAliases(final String s) {
final StringBuilder sb = new StringBuilder();
s.chars().forEach(ch -> {
final int i = StringUtils.indexOf(aliases_from, ch);
sb.append(i >= 0 ? aliases_to.charAt(i) : (char) ch);
});
return sb.toString();
}
protected static String transliterate(final String s) {
try {
return transliterator.transliterate(s);
} catch (Exception e) {
return s;
}
}
protected static String removeSymbols(final String s) {
final StringBuilder sb = new StringBuilder();
@ -162,23 +132,6 @@ public class AbstractPaceFunctions {
return s != null;
}
public static String normalize(final String s) {
return fixAliases(transliterate(nfd(unicodeNormalization(s))))
.toLowerCase()
// do not compact the regexes in a single expression, would cause StackOverflowError in case of large input
// strings
.replaceAll("[^ \\w]+", "")
.replaceAll("(\\p{InCombiningDiacriticalMarks})+", "")
.replaceAll("(\\p{Punct})+", " ")
.replaceAll("(\\d)+", " ")
.replaceAll("(\\n)+", " ")
.trim();
}
public static String nfd(final String s) {
return Normalizer.normalize(s, Normalizer.Form.NFD);
}
public static String utf8(final String s) {
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
return new String(bytes, StandardCharsets.UTF_8);
@ -233,22 +186,6 @@ public class AbstractPaceFunctions {
return newset;
}
public static Set<String> loadFromClasspath(final String classpath) {
Transliterator transliterator = Transliterator.getInstance("Any-Eng");
final Set<String> h = Sets.newHashSet();
try {
for (final String s : IOUtils
.readLines(NGramUtils.class.getResourceAsStream(classpath), StandardCharsets.UTF_8)) {
h.add(fixAliases(transliterator.transliterate(s))); // transliteration of the stopwords
}
} catch (final Throwable e) {
return Sets.newHashSet();
}
return h;
}
public static Map<String, String> loadMapFromClasspath(final String classpath) {
Transliterator transliterator = Transliterator.getInstance("Any-Eng");
@ -303,10 +240,6 @@ public class AbstractPaceFunctions {
return StringUtils.substring(s, 0, 1).toLowerCase();
}
protected static Iterable<String> tokens(final String s, final int maxTokens) {
return Iterables.limit(Splitter.on(" ").omitEmptyStrings().trimResults().split(s), maxTokens);
}
public static String normalizePid(String pid) {
return DOI_PREFIX.matcher(pid.toLowerCase()).replaceAll("");
}

View File

@ -80,9 +80,11 @@ public class PrepareFOSSparkJob implements Serializable {
fosDataset
.groupByKey((MapFunction<FOSDataModel, String>) v -> v.getOaid().toLowerCase(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, FOSDataModel, Result>) (k, it) -> {
return getResult(ModelSupport.getIdPrefix(Result.class) + "|" + k, it);
}, Encoders.bean(Result.class))
.mapGroups(
(MapGroupsFunction<String, FOSDataModel, Result>) (k,
it) -> getResult(
ModelSupport.entityIdPrefix.get(Result.class.getSimpleName().toLowerCase()) + "|" + k, it),
Encoders.bean(Result.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")

View File

@ -102,6 +102,8 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=300s
--conf spark.shuffle.registration.timeout=50000
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--graphOutputPath</arg><arg>${graphOutputPath}</arg>

View File

@ -33,16 +33,14 @@
<description>max number of elements in a connected component</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
<name>sparkResourceOpts</name>
<value>--executor-memory=6G --conf spark.executor.memoryOverhead=4G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
<name>sparkResourceOptsCreateMergeRel</name>
<value>--executor-memory=6G --conf spark.executor.memoryOverhead=4G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
@ -119,9 +117,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -146,9 +142,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkWhitelistSimRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -174,9 +168,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOptsCreateMergeRel}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -203,9 +195,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -230,9 +220,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsMergeRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -257,9 +245,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateOrgsDedupRecord</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -283,9 +269,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -309,9 +293,7 @@
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkResourceOpts}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -100,16 +100,12 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.sql.shuffle.partitions=3840
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=8000
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
@ -132,12 +128,11 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
@ -160,12 +155,11 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
@ -188,12 +182,11 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
@ -218,12 +211,11 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
@ -247,19 +239,14 @@
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=4
--executor-memory=4G
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=5G
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=15000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
@ -282,15 +269,12 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=8000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
@ -312,15 +296,12 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=8000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
@ -342,15 +323,12 @@
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=4000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
@ -362,15 +340,6 @@
</action>
<join name="wait2" to="End"/>
<!-- <action name="reset_workingDir">-->
<!-- <fs>-->
<!-- <delete path="${workingDir}"/>-->
<!-- <mkdir path="${workingDir}"/>-->
<!-- </fs>-->
<!-- <ok to="End"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<end name="End"/>

View File

@ -90,6 +90,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-pace-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.graph.clean;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.SerializationUtils;
@ -29,7 +30,10 @@ public class CleaningRuleMap extends HashMap<Class<?>, SerializableConsumer<Obje
mapping.put(AccessRight.class, o -> cleanQualifier(vocabularies, (AccessRight) o));
mapping.put(Country.class, o -> cleanCountry(vocabularies, (Country) o));
mapping.put(Relation.class, o -> cleanRelation(vocabularies, (Relation) o));
mapping.put(Subject.class, o -> cleanSubject(vocabularies, (Subject) o));
// commenting out the subject cleaning until we decide if we want to it or not and the implementation will
// be completed. At the moment it is not capable of expanding the whole hierarchy.
// mapping.put(Subject.class, o -> cleanSubject(vocabularies, (Subject) o));
return mapping;
}
@ -38,8 +42,15 @@ public class CleaningRuleMap extends HashMap<Class<?>, SerializableConsumer<Obje
// TODO cleaning based on different subject vocabs can be added here
}
/**
* The procedure cleans out the subject values, using a vocabulary identified by the field subject.qualifier.classid.
*
* @param vocabularyId
* @param vocabularies
* @param subject
*/
private static void cleanSubjectForVocabulary(String vocabularyId, VocabularyGroup vocabularies,
Subject subject) {
Subject subject) {
vocabularies.find(vocabularyId).ifPresent(vocabulary -> {
if (ModelConstants.DNET_SUBJECT_KEYWORD.equalsIgnoreCase(subject.getQualifier().getClassid())) {
@ -49,14 +60,21 @@ public class CleaningRuleMap extends HashMap<Class<?>, SerializableConsumer<Obje
subject.getQualifier().setClassid(vocabularyId);
subject.getQualifier().setClassname(vocabulary.getName());
}
} else if (vocabularyId.equals(subject.getQualifier().getClassid()) &&
Objects.nonNull(subject.getDataInfo()) &&
!"subject:fos".equals(subject.getDataInfo().getProvenanceaction())) {
Qualifier syn = vocabulary.getSynonymAsQualifier(subject.getValue());
VocabularyTerm term = vocabulary.getTerm(subject.getValue());
if (Objects.isNull(syn) && Objects.isNull(term)) {
subject.getQualifier().setClassid(ModelConstants.DNET_SUBJECT_KEYWORD);
subject.getQualifier().setClassname(ModelConstants.DNET_SUBJECT_KEYWORD);
} else {
final String provenanceActionClassId = Optional.ofNullable(subject.getDataInfo())
.map(DataInfo::getProvenanceaction)
.map(Qualifier::getClassid)
.orElse(null);
if (vocabularyId.equals(subject.getQualifier().getClassid()) &&
!"subject:fos".equals(provenanceActionClassId)) {
Qualifier syn = vocabulary.getSynonymAsQualifier(subject.getValue());
VocabularyTerm term = vocabulary.getTerm(subject.getValue());
if (Objects.isNull(syn) && Objects.isNull(term)) {
subject.getQualifier().setClassid(ModelConstants.DNET_SUBJECT_KEYWORD);
subject.getQualifier().setClassname(ModelConstants.DNET_SUBJECT_KEYWORD);
}
}
}
});

View File

@ -153,10 +153,15 @@ public class CreateRelatedEntitiesJob_phase1 {
result
.getTitle()
.stream()
.filter(t -> StringUtils.isNotBlank(t.getValue()))
.findFirst()
.map(StructuredProperty::getValue)
.ifPresent(
title -> re.getTitle().setValue(StringUtils.left(title, ModelHardLimits.MAX_TITLE_LENGTH)));
title -> {
re.setTitle(title);
re
.getTitle()
.setValue(StringUtils.left(title.getValue(), ModelHardLimits.MAX_TITLE_LENGTH));
});
}
if (Objects.nonNull(result.getDescription()) && !result.getDescription().isEmpty()) {
result

View File

@ -3,24 +3,16 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
import static org.apache.spark.sql.functions.*;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,9 +37,9 @@ import scala.Tuple2;
/**
* XmlConverterJob converts the JoinedEntities as XML records
*/
public class XmlConverterJob {
public class PayloadConverterJob {
private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class);
private static final Logger log = LoggerFactory.getLogger(PayloadConverterJob.class);
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
@ -56,8 +48,8 @@ public class XmlConverterJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
XmlConverterJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json")));
PayloadConverterJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_payload_converter.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@ -72,6 +64,12 @@ public class XmlConverterJob {
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final Boolean validateXML = Optional
.ofNullable(parser.get("validateXML"))
.map(Boolean::valueOf)
.orElse(Boolean.FALSE);
log.info("validateXML: {}", validateXML);
final String contextApiBaseUrl = parser.get("contextApiBaseUrl");
log.info("contextApiBaseUrl: {}", contextApiBaseUrl);
@ -86,18 +84,19 @@ public class XmlConverterJob {
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, outputPath);
convertToXml(
createPayloads(
spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl),
VocabularyGroup.loadVocsFromIS(isLookup));
VocabularyGroup.loadVocsFromIS(isLookup), validateXML);
});
}
private static void convertToXml(
private static void createPayloads(
final SparkSession spark,
final String inputPath,
final String outputPath,
final ContextMapper contextMapper,
final VocabularyGroup vocabularies) {
final VocabularyGroup vocabularies,
final Boolean validateXML) {
final XmlRecordFactory recordFactory = new XmlRecordFactory(
prepareAccumulators(spark.sparkContext()),
@ -118,7 +117,7 @@ public class XmlConverterJob {
.as(Encoders.kryo(JoinedEntity.class))
.map(
(MapFunction<JoinedEntity, Tuple2<String, SolrRecord>>) je -> new Tuple2<>(
recordFactory.build(je),
recordFactory.build(je, validateXML),
ProvisionModelSupport.transform(je, contextMapper, vocabularies)),
Encoders.tuple(Encoders.STRING(), Encoders.bean(SolrRecord.class)))
.map(

View File

@ -2,42 +2,34 @@
package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.col;
import java.util.HashSet;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
/**
* PrepareRelationsJob prunes the relationships: only consider relationships that are not virtually deleted
@ -130,132 +122,36 @@ public class PrepareRelationsJob {
private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath,
Set<String> relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) {
JavaRDD<Relation> rels = readPathRelationRDD(spark, inputRelationsPath)
.filter(rel -> !(rel.getSource().startsWith("unresolved") || rel.getTarget().startsWith("unresolved")))
.filter(rel -> !rel.getDataInfo().getDeletedbyinference())
.filter(rel -> !relationFilter.contains(StringUtils.lowerCase(rel.getRelClass())));
WindowSpec source_w = Window
.partitionBy("source", "subRelType")
.orderBy(col("target").desc_nulls_last());
JavaRDD<Relation> pruned = pruneRels(
pruneRels(
rels,
sourceMaxRelations, relPartitions, (Function<Relation, String>) Relation::getSource),
targetMaxRelations, relPartitions, (Function<Relation, String>) Relation::getTarget);
spark
.createDataset(pruned.rdd(), Encoders.bean(Relation.class))
.repartition(relPartitions)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
WindowSpec target_w = Window
.partitionBy("target", "subRelType")
.orderBy(col("source").desc_nulls_last());
private static JavaRDD<Relation> pruneRels(JavaRDD<Relation> rels, int maxRelations,
int relPartitions, Function<Relation, String> idFn) {
return rels
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1)
.map(Tuple2::_2)
.map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator)
.map(Tuple2::_2);
}
// experimental
private static void prepareRelationsDataset(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations,
int relPartitions) {
spark
.read()
.textFile(inputRelationsPath)
.repartition(relPartitions)
.map(
(MapFunction<String, Relation>) s -> OBJECT_MAPPER.readValue(s, Relation.class),
Encoders.kryo(Relation.class))
.filter((FilterFunction<Relation>) rel -> !rel.getDataInfo().getDeletedbyinference())
.filter((FilterFunction<Relation>) rel -> !relationFilter.contains(rel.getRelClass()))
.groupByKey(
(MapFunction<Relation, String>) Relation::getSource,
Encoders.STRING())
.agg(new RelationAggregator(maxRelations).toColumn())
.flatMap(
(FlatMapFunction<Tuple2<String, RelationList>, Relation>) t -> Iterables
.limit(t._2().getRelations(), maxRelations)
.iterator(),
Encoders.bean(Relation.class))
.repartition(relPartitions)
.schema(Encoders.bean(Relation.class).schema())
.json(inputRelationsPath)
.where("source NOT LIKE 'unresolved%' AND target NOT LIKE 'unresolved%'")
.where("datainfo.deletedbyinference != true")
.where(
relationFilter.isEmpty() ? ""
: "lower(relClass) NOT IN ("
+ relationFilter.stream().map(s -> "'" + s + "'").collect(Collectors.joining(",")) + ")")
.withColumn("source_w_pos", functions.row_number().over(source_w))
.where("source_w_pos < " + sourceMaxRelations)
.drop("source_w_pos")
.withColumn("target_w_pos", functions.row_number().over(target_w))
.where("target_w_pos < " + targetMaxRelations)
.drop("target_w_pos")
.coalesce(relPartitions)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
public static class RelationAggregator
extends Aggregator<Relation, RelationList, RelationList> {
private final int maxRelations;
public RelationAggregator(int maxRelations) {
this.maxRelations = maxRelations;
}
@Override
public RelationList zero() {
return new RelationList();
}
@Override
public RelationList reduce(RelationList b, Relation a) {
b.getRelations().add(a);
return getSortableRelationList(b);
}
@Override
public RelationList merge(RelationList b1, RelationList b2) {
b1.getRelations().addAll(b2.getRelations());
return getSortableRelationList(b1);
}
@Override
public RelationList finish(RelationList r) {
return getSortableRelationList(r);
}
private RelationList getSortableRelationList(RelationList b1) {
RelationList sr = new RelationList();
sr
.setRelations(
b1
.getRelations()
.stream()
.limit(maxRelations)
.collect(Collectors.toCollection(() -> new PriorityQueue<>(new RelationComparator()))));
return sr;
}
@Override
public Encoder<RelationList> bufferEncoder() {
return Encoders.kryo(RelationList.class);
}
@Override
public Encoder<RelationList> outputEncoder() {
return Encoders.kryo(RelationList.class);
}
}
/**
* Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text
* file,
*
* @param spark
* @param inputPath
* @return the JavaRDD<SortableRelation> containing all the relationships
*/
private static JavaRDD<Relation> readPathRelationRDD(
SparkSession spark, final String inputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}

View File

@ -1,44 +0,0 @@
package eu.dnetlib.dhp.oa.provision;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class RelationComparator implements Comparator<Relation> {
private static final Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put(ModelConstants.OUTCOME, 0);
weights.put(ModelConstants.SUPPLEMENT, 1);
weights.put(ModelConstants.REVIEW, 2);
weights.put(ModelConstants.CITATION, 3);
weights.put(ModelConstants.AFFILIATION, 4);
weights.put(ModelConstants.RELATIONSHIP, 5);
weights.put(ModelConstants.PUBLICATION_DATASET, 6);
weights.put(ModelConstants.SIMILARITY, 7);
weights.put(ModelConstants.PROVISION, 8);
weights.put(ModelConstants.PARTICIPATION, 9);
weights.put(ModelConstants.DEDUP, 10);
}
private Integer getWeight(Relation o) {
return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE);
}
@Override
public int compare(Relation o1, Relation o2) {
return ComparisonChain
.start()
.compare(getWeight(o1), getWeight(o2))
.result();
}
}

View File

@ -1,25 +0,0 @@
package eu.dnetlib.dhp.oa.provision;
import java.io.Serializable;
import java.util.PriorityQueue;
import java.util.Queue;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class RelationList implements Serializable {
private Queue<Relation> relations;
public RelationList() {
this.relations = new PriorityQueue<>(new RelationComparator());
}
public Queue<Relation> getRelations() {
return relations;
}
public void setRelations(Queue<Relation> relations) {
this.relations = relations;
}
}

View File

@ -1,81 +0,0 @@
package eu.dnetlib.dhp.oa.provision;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class SortableRelation extends Relation implements Comparable<SortableRelation>, Serializable {
private static final Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put(ModelConstants.OUTCOME, 0);
weights.put(ModelConstants.SUPPLEMENT, 1);
weights.put(ModelConstants.REVIEW, 2);
weights.put(ModelConstants.CITATION, 3);
weights.put(ModelConstants.AFFILIATION, 4);
weights.put(ModelConstants.RELATIONSHIP, 5);
weights.put(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID, 6);
weights.put(ModelConstants.SIMILARITY, 7);
weights.put(ModelConstants.PROVISION, 8);
weights.put(ModelConstants.PARTICIPATION, 9);
weights.put(ModelConstants.DEDUP, 10);
}
private static final long serialVersionUID = 34753984579L;
private String groupingKey;
public static SortableRelation create(Relation r, String groupingKey) {
SortableRelation sr = new SortableRelation();
sr.setGroupingKey(groupingKey);
sr.setSource(r.getSource());
sr.setTarget(r.getTarget());
sr.setRelType(r.getRelType());
sr.setSubRelType(r.getSubRelType());
sr.setRelClass(r.getRelClass());
sr.setDataInfo(r.getDataInfo());
sr.setCollectedfrom(r.getCollectedfrom());
sr.setLastupdatetimestamp(r.getLastupdatetimestamp());
sr.setProperties(r.getProperties());
sr.setValidated(r.getValidated());
sr.setValidationDate(r.getValidationDate());
return sr;
}
@JsonIgnore
public Relation asRelation() {
return this;
}
@Override
public int compareTo(SortableRelation o) {
return ComparisonChain
.start()
.compare(getGroupingKey(), o.getGroupingKey())
.compare(getWeight(this), getWeight(o))
.result();
}
private Integer getWeight(SortableRelation o) {
return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE);
}
public String getGroupingKey() {
return groupingKey;
}
public void setGroupingKey(String groupingKey) {
this.groupingKey = groupingKey;
}
}

View File

@ -1,8 +1,6 @@
package eu.dnetlib.dhp.oa.provision.model;
import static org.apache.commons.lang3.StringUtils.substringBefore;
import java.io.StringReader;
import java.util.*;
import java.util.stream.Collectors;
@ -16,16 +14,15 @@ import org.jetbrains.annotations.Nullable;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.common.vocabulary.VocabularyTerm;
import eu.dnetlib.dhp.oa.provision.RelationList;
import eu.dnetlib.dhp.oa.provision.SortableRelation;
import eu.dnetlib.dhp.oa.provision.utils.ContextDef;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.solr.*;
import eu.dnetlib.dhp.schema.solr.AccessRight;
import eu.dnetlib.dhp.schema.solr.Author;
@ -55,10 +52,7 @@ public class ProvisionModelSupport {
.newArrayList(
RelatedEntityWrapper.class,
JoinedEntity.class,
RelatedEntity.class,
SortableRelationKey.class,
SortableRelation.class,
RelationList.class));
RelatedEntity.class));
return modelClasses.toArray(new Class[] {});
}
@ -74,7 +68,11 @@ public class ProvisionModelSupport {
.setHeader(
SolrRecordHeader
.newInstance(
e.getId(), e.getOriginalId(), type, deletedbyinference));
StringUtils
.substringAfter(
e.getId(),
IdentifierFactory.ID_PREFIX_SEPARATOR),
e.getOriginalId(), type, deletedbyinference));
r.setCollectedfrom(asProvenance(e.getCollectedfrom()));
r.setContext(asContext(e.getContext(), contextMapper));
r.setPid(asPid(e.getPid()));
@ -114,7 +112,8 @@ public class ProvisionModelSupport {
.newInstance(
relation.getRelType(),
relation.getRelClass(),
relation.getTarget(), relatedRecordType));
StringUtils.substringAfter(relation.getTarget(), IdentifierFactory.ID_PREFIX_SEPARATOR),
relatedRecordType));
rr.setAcronym(re.getAcronym());
rr.setCode(re.getCode());

View File

@ -1,25 +1,23 @@
package eu.dnetlib.dhp.oa.provision.utils;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.authorPidTypes;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getRelDescriptor;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.substringBefore;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.mycila.xmltool.XMLDoc;
import com.mycila.xmltool.XMLTag;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.model.XmlInstance;
import eu.dnetlib.dhp.schema.common.*;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@ -31,27 +29,26 @@ import org.dom4j.Node;
import org.dom4j.io.OutputFormat;
import org.dom4j.io.SAXReader;
import org.dom4j.io.XMLWriter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.mycila.xmltool.XMLDoc;
import com.mycila.xmltool.XMLTag;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.model.XmlInstance;
import eu.dnetlib.dhp.schema.common.*;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
import scala.Tuple2;
import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.authorPidTypes;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getRelDescriptor;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.substringBefore;
public class XmlRecordFactory implements Serializable {
/**
@ -93,10 +90,13 @@ public class XmlRecordFactory implements Serializable {
}
public String build(final JoinedEntity je) {
return build(je, false);
}
public String build(final JoinedEntity je, final Boolean validate) {
final Set<String> contexts = Sets.newHashSet();
// final OafEntity entity = toOafEntity(je.getEntity());
final OafEntity entity = je.getEntity();
final TemplateFactory templateFactory = new TemplateFactory();
try {
@ -122,8 +122,14 @@ public class XmlRecordFactory implements Serializable {
.buildBody(
mainType, metadata, relations, listChildren(entity, je, templateFactory), listExtraInfo(entity));
return templateFactory.buildRecord(entity, schemaLocation, body);
// return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent);
String xmlRecord = templateFactory.buildRecord(entity, schemaLocation, body);
if (Boolean.TRUE.equals(validate)) {
// rise an exception when an invalid record was built
new SAXReader().read(new StringReader(xmlRecord));
}
return xmlRecord;
// return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent);
} catch (final Throwable e) {
throw new RuntimeException(String.format("error building record '%s'", entity.getId()), e);
}
@ -1038,13 +1044,21 @@ public class XmlRecordFactory implements Serializable {
}
private List<String> measuresAsXml(List<Measure> measures) {
return measures
.stream()
.map(m -> {
List<Tuple2<String, String>> l = Lists.newArrayList(new Tuple2<>("id", m.getId()));
m.getUnit().forEach(kv -> l.add(new Tuple2<>(kv.getKey(), kv.getValue())));
return XmlSerializationUtils.asXmlElement("measure", l);
})
return Stream
.concat(
measures
.stream()
.filter(m -> !"downloads".equals(m.getId()) && !"views".equals(m.getId()))
.map(m -> {
List<Tuple2<String, String>> l = Lists.newArrayList(new Tuple2<>("id", m.getId()));
m.getUnit().forEach(kv -> l.add(new Tuple2<>(kv.getKey(), kv.getValue())));
return XmlSerializationUtils.asXmlElement("measure", l);
}),
measures
.stream()
.filter(m -> "downloads".equals(m.getId()) || "views".equals(m.getId()))
.filter(m -> m.getUnit().stream().anyMatch(u -> Integer.parseInt(u.getValue()) > 0))
.map(m -> XmlSerializationUtils.usageMeasureAsXmlElement("measure", m)))
.collect(Collectors.toList());
}

View File

@ -5,7 +5,11 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.removePrefix;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@ -166,6 +170,35 @@ public class XmlSerializationUtils {
return sb.toString();
}
// <measure downloads="0" views="0">infrastruct_::f66f1bd369679b5b077dcdf006089556||OpenAIRE</measure>
public static String usageMeasureAsXmlElement(String name, Measure measure) {
HashSet<String> dsIds = Optional
.ofNullable(measure.getUnit())
.map(
m -> m
.stream()
.map(KeyValue::getKey)
.collect(Collectors.toCollection(HashSet::new)))
.orElse(new HashSet<>());
StringBuilder sb = new StringBuilder();
dsIds.forEach(dsId -> {
sb
.append("<")
.append(name);
for (KeyValue kv : measure.getUnit()) {
sb.append(" ").append(attr(measure.getId(), kv.getValue()));
}
sb
.append(">")
.append(dsId)
.append("</")
.append(name)
.append(">");
});
return sb.toString();
}
public static String mapEoscIf(EoscIfGuidelines e) {
return asXmlElement(
"eoscifguidelines", Lists

View File

@ -22,5 +22,11 @@
"paramLongName": "isLookupUrl",
"paramDescription": "URL of the context ISLookup Service",
"paramRequired": true
},
{
"paramName": "val",
"paramLongName": "validateXML",
"paramDescription": "should the process check the XML validity",
"paramRequired": false
}
]

View File

@ -13,6 +13,11 @@
<name>contextApiBaseUrl</name>
<description>context API URL</description>
</property>
<property>
<name>validateXML</name>
<description>should the payload converter validate the XMLs</description>
<value>false</value>
</property>
<property>
<name>relPartitions</name>
<description>number or partitions for the relations Dataset</description>
@ -125,7 +130,7 @@
<case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case>
<case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case>
<case to="fork_join_all_entities">${wf:conf('resumeFrom') eq 'fork_join_all_entities'}</case>
<case to="convert_to_xml">${wf:conf('resumeFrom') eq 'convert_to_xml'}</case>
<case to="create_payloads">${wf:conf('resumeFrom') eq 'create_payloads'}</case>
<case to="drop_solr_collection">${wf:conf('resumeFrom') eq 'drop_solr_collection'}</case>
<case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case>
<default to="prepare_relations"/>
@ -144,21 +149,23 @@
<class>eu.dnetlib.dhp.oa.provision.PrepareRelationsJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--executor-cores=4
--executor-memory=6G
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.executor.memoryOverhead=6G
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--sourceMaxRelations</arg><arg>${sourceMaxRelations}</arg>
<arg>--targetMaxRelations</arg><arg>${targetMaxRelations}</arg>
<arg>--relationFilter</arg><arg>${relationFilter}</arg>
<arg>--relPartitions</arg><arg>5000</arg>
<arg>--relPartitions</arg><arg>15000</arg>
</spark>
<ok to="fork_join_related_entities"/>
<error to="Kill"/>
@ -585,19 +592,20 @@
<error to="Kill"/>
</action>
<join name="wait_join_phase2" to="convert_to_xml"/>
<join name="wait_join_phase2" to="create_payloads"/>
<action name="convert_to_xml">
<action name="create_payloads">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>convert_to_xml</name>
<class>eu.dnetlib.dhp.oa.provision.XmlConverterJob</class>
<name>create_payloads</name>
<class>eu.dnetlib.dhp.oa.provision.PayloadConverterJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -605,8 +613,9 @@
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--inputPath</arg><arg>/user/claudio.atzori/data/beta_provision/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/xml_json</arg>
<arg>--validateXML</arg><arg>${validateXML}</arg>
<arg>--contextApiBaseUrl</arg><arg>${contextApiBaseUrl}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>

View File

@ -50,7 +50,7 @@ public class EOSCFuture_Test {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
PayloadConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER
.readValue(

View File

@ -57,7 +57,7 @@ public class IndexRecordTransformerTest {
public void testPublicationRecordTransformation() throws IOException, TransformerException {
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
PayloadConverterJob.schemaLocation);
final Publication p = load("publication.json", Publication.class);
final Project pj = load("project.json", Project.class);
@ -82,7 +82,7 @@ public class IndexRecordTransformerTest {
void testPeerReviewed() throws IOException, TransformerException {
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
PayloadConverterJob.schemaLocation);
final Publication p = load("publication.json", Publication.class);
@ -98,7 +98,7 @@ public class IndexRecordTransformerTest {
public void testRiunet() throws IOException, TransformerException {
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
PayloadConverterJob.schemaLocation);
final Publication p = load("riunet.json", Publication.class);

View File

@ -37,7 +37,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
PayloadConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class);
@ -105,7 +105,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
PayloadConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class);
@ -136,7 +136,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
PayloadConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class);
@ -166,7 +166,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
PayloadConverterJob.schemaLocation);
final Datasource d = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("datasource.json")), Datasource.class);
@ -203,7 +203,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
PayloadConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER
.readValue(
@ -226,7 +226,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
PayloadConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER
.readValue(
@ -249,7 +249,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
PayloadConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER
.readValue(

View File

@ -71,6 +71,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkHighDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -108,6 +109,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -141,6 +143,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -176,6 +179,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -209,6 +213,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -245,6 +250,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -315,6 +321,7 @@
--executor-memory=${sparkNormalExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkNormalExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -361,6 +368,7 @@
--executor-memory=${sparkNormalExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkNormalExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -409,6 +417,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkHighDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -444,6 +453,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkHighDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -482,6 +492,7 @@
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -533,6 +544,7 @@
--executor-memory=${sparkNormalExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.executor.memoryOverhead=${sparkNormalExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -67,24 +67,21 @@ function copydb() {
if [ -n "$log_errors" ]; then
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN DROPPING THE OLD DATABASE! EXITING...\n\n"
rm -f error.log
return 1
exit 2
fi
# Make Impala aware of the deletion of the old DB immediately.
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
echo -e "\n\nCopying files of '${db}', from Ocean to Impala cluster..\n"
# Using max-bandwidth of: 50 * 100 Mb/s = 5 Gb/s
# Using max memory of: 50 * 6144 = 300 Gb
# Using max-bandwidth of: 70 * 150 Mb/s = 10.5 Gb/s
# Using max memory of: 70 * 6144 = 430 Gb
# Using 1MB as a buffer-size.
# The " -Ddistcp.dynamic.recordsPerChunk=50" arg is not available in our version of hadoop
# The " -Ddistcp.dynamic.recordsPerChunk=N" arg is not available in our version of hadoop
# The "ug" args cannot be used as we get a "User does not belong to hive" error.
# The "p" argument cannot be used, as it blocks the files from being used, giving a "sticky bit"-error, even after applying chmod and chown onm the files.
hadoop distcp -Dmapreduce.map.memory.mb=6144 -m 70 -bandwidth 150 \
-numListstatusThreads 40 \
-copybuffersize 1048576 \
-strategy dynamic \
-blocksperchunk 8 \
-pb \
${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH}
@ -92,9 +89,9 @@ function copydb() {
if [ $? -eq 0 ]; then
echo -e "\nSuccessfully copied the files of '${db}'.\n"
else
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT WITH EXIT STATUS: $?\n\n"
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT EXIT STATUS: $?\n\n"
rm -f error.log
return 2
exit 3
fi
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the following costly operation as well..
@ -105,14 +102,11 @@ function copydb() {
# create the new database (with the same name)
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create database ${db}"
# Make Impala aware of the creation of the new DB immediately.
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
sleep 1
# Because "Hive" and "Impala" do not have compatible schemas, we cannot use the "show create table <name>" output from hive to create the exact same table in impala.
# So, we have to find at least one parquet file (check if it's there) from the table in the ocean cluster for impala to use it to extract the table-schema itself from that file.
all_create_view_statements=()
num_tables=0
entities_on_ocean=`hive -e "show tables in ${db};" | sed 's/WARN:.*//g'` # Get the tables and views without any potential the "WARN" logs.
for i in ${entities_on_ocean[@]}; do # Use un-quoted values, as the elemetns are single-words.
@ -129,9 +123,11 @@ function copydb() {
all_create_view_statements+=("$create_view_statement")
else
echo -e "\n'${i}' is a table, so we will check for its parquet files and create the table on Impala cluster.\n"
((num_tables++))
CURRENT_PRQ_FILE=`hdfs dfs -conf ${IMPALA_CONFIG_FILE} -ls -C "${IMPALA_HDFS_DB_BASE_PATH}/${db}.db/${i}/" | grep -v 'Found' | grep -v '_impala_insert_staging' | head -1`
if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside.
echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n"
exit 4 # Comment out when testing a DB which has such a table, just for performing this exact test-check.
else
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log
log_errors=`cat error.log | grep -E "WARN|ERROR|FAILED"`
@ -142,74 +138,73 @@ function copydb() {
fi
done
echo -e "\nAll tables have been created, going to create the views..\n"
previous_num_of_views_to_retry=${#all_create_view_statements[@]}
if [[ $num_tables -gt 0 ]]; then
echo -e "\nAll ${num_tables} tables have been created, for db '${db}', going to create the ${previous_num_of_views_to_retry} views..\n"
else
echo -e "\nDB '${db}' does not have any tables, moving on to create the ${previous_num_of_views_to_retry} views..\n"
fi
# Time to loop through the views and create them.
# At this point all table-schemas should have been created.
previous_num_of_views_to_retry=${#all_create_view_statements}
if [[ $previous_num_of_views_to_retry -gt 0 ]]; then
echo -e "\nAll_create_view_statements:\n\n${all_create_view_statements[@]}\n" # DEBUG
# Make Impala aware of the new tables, so it knows them when creating the views.
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
sleep 1
echo -e "\nAll_create_view_statements (${previous_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n" # DEBUG
else
echo -e "\nDB '${db}' does not contain any views.\n"
fi
level_counter=0
while [[ ${#all_create_view_statements[@]} -gt 0 ]]; do
while [[ $previous_num_of_views_to_retry -gt 0 ]]; do
((level_counter++))
# The only accepted reason for a view to not be created, is if it depends on another view, which has not been created yet.
# In this case, we should retry creating this particular view again.
should_retry_create_view_statements=()
new_num_of_views_to_retry=0
for create_view_statement in "${all_create_view_statements[@]}"; do # Here we use double quotes, as the elements are phrases, instead of single-words.
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "${create_view_statement}" |& tee error.log # impala-shell prints all logs in stderr, so wee need to capture them and put them in a file, in order to perform "grep" on them later
specific_errors=`cat error.log | grep -E "FAILED: ParseException line 1:13 missing TABLE at 'view'|ERROR: AnalysisException: Could not resolve table reference:"`
if [ -n "$specific_errors" ]; then
echo -e "\nspecific_errors: ${specific_errors}\n"
echo -e "\nView '$(cat error.log | grep "CREATE VIEW " | sed 's/CREATE VIEW //g' | sed 's/ as select .*//g')' failed to be created, possibly because it depends on another view.\n"
should_retry_create_view_statements+=("$create_view_statement")
echo -e "\nView '$(cat error.log | grep -Eo "Query: CREATE VIEW ([^\s]+)" | sed 's/Query: CREATE VIEW //g')' failed to be created, possibly because it depends on another view.\n"
((new_num_of_views_to_retry++)) # Increment it here, instead of acquiring the array's size in the end, as that doesn't work for some reason.
else
all_create_view_statements=("${all_create_view_statements[@]/$create_view_statement}") # Remove the current successful statement from the list.
sleep 1 # Wait a bit for Impala to register that the view was created, before possibly referencing it by another view.
fi
done
new_num_of_views_to_retry=${#should_retry_create_view_statements}
all_create_view_statements=("$(echo "${all_create_view_statements[@]}" | grep -v '^[\s]*$')") # Re-index the array, filtering-out any empty elements.
# Although the above command reduces the "active" elements to just the few to-be-retried, it does not manage to make the array return the its true size through the "${#all_create_view_statements[@]}" statement. So we use counters.
if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then
echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n"
return 3
exit 5
elif [[ $new_num_of_views_to_retry -gt 0 ]]; then
echo -e "\nTo be retried \"create_view_statements\":\n\n${should_retry_create_view_statements[@]}\n"
previous_num_of_views_to_retry=$new_num_of_views_to_retry
echo -e "\nTo be retried \"create_view_statements\" (${new_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n"
else
echo -e "\nFinished creating views, for db: '${db}', in level-${level_counter}.\n"
fi
all_create_view_statements=("${should_retry_create_view_statement[@]}") # This is needed in any case to either move forward with the rest of the views or stop at 0 remaining views.
previous_num_of_views_to_retry=$new_num_of_views_to_retry
done
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
sleep 1
echo -e "\nComputing stats for tables..\n"
entities_on_impala=`impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} --delimited -q "show tables in ${db}"`
for i in ${entities_on_impala[@]}; do # Use un-quoted values, as the elemetns are single-words.
# Taking the create table statement from the Ocean cluster, just to check if its a view, as the output is easier than using impala-shell from Impala cluster.
create_view_statement=`hive -e "show create table ${db}.${i};" | grep "CREATE VIEW"` # This grep works here, as we do not want to match multiple-lines.
if [ -z "$create_view_statement" ]; then # If it's a table, then go load the data to it.
# Invalidate metadata of this DB's tables, in order for Impala to be aware of all parquet files put inside the tables' directories, previously, by "hadoop distcp".
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA ${db}.${i}"
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "compute stats ${db}.${i}";
fi
done
# Check if the entities in both clusters are the same, down to the exact names, not just the counts. (they are sorted in the same way both in hive and impala)
if [ "${entities_on_impala[@]}" == "${entities_on_ocean[@]}" ]; then
echo -e "\nAll entities have been copied to Impala cluster.\n"
else
echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n"
rm -f error.log
return 4
exit 6
fi
rm -f error.log

View File

@ -66,24 +66,21 @@ function copydb() {
if [ -n "$log_errors" ]; then
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN DROPPING THE OLD DATABASE! EXITING...\n\n"
rm -f error.log
return 1
exit 2
fi
# Make Impala aware of the deletion of the old DB immediately.
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
echo -e "\n\nCopying files of '${db}', from Ocean to Impala cluster..\n"
# Using max-bandwidth of: 50 * 100 Mb/s = 5 Gb/s
# Using max memory of: 50 * 6144 = 300 Gb
# Using max-bandwidth of: 70 * 150 Mb/s = 10.5 Gb/s
# Using max memory of: 70 * 6144 = 430 Gb
# Using 1MB as a buffer-size.
# The " -Ddistcp.dynamic.recordsPerChunk=50" arg is not available in our version of hadoop
# The " -Ddistcp.dynamic.recordsPerChunk=N" arg is not available in our version of hadoop
# The "ug" args cannot be used as we get a "User does not belong to hive" error.
# The "p" argument cannot be used, as it blocks the files from being used, giving a "sticky bit"-error, even after applying chmod and chown onm the files.
hadoop distcp -Dmapreduce.map.memory.mb=6144 -m 70 -bandwidth 150 \
-numListstatusThreads 40 \
-copybuffersize 1048576 \
-strategy dynamic \
-blocksperchunk 8 \
-pb \
${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH}
@ -91,9 +88,9 @@ function copydb() {
if [ $? -eq 0 ]; then
echo -e "\nSuccessfully copied the files of '${db}'.\n"
else
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT WITH EXIT STATUS: $?\n\n"
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT EXIT STATUS: $?\n\n"
rm -f error.log
return 2
exit 3
fi
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the following costly operation as well..
@ -104,14 +101,11 @@ function copydb() {
# create the new database (with the same name)
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create database ${db}"
# Make Impala aware of the creation of the new DB immediately.
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
sleep 1
# Because "Hive" and "Impala" do not have compatible schemas, we cannot use the "show create table <name>" output from hive to create the exact same table in impala.
# So, we have to find at least one parquet file (check if it's there) from the table in the ocean cluster for impala to use it to extract the table-schema itself from that file.
all_create_view_statements=()
num_tables=0
entities_on_ocean=`hive -e "show tables in ${db};" | sed 's/WARN:.*//g'` # Get the tables and views without any potential the "WARN" logs.
for i in ${entities_on_ocean[@]}; do # Use un-quoted values, as the elemetns are single-words.
@ -128,9 +122,11 @@ function copydb() {
all_create_view_statements+=("$create_view_statement")
else
echo -e "\n'${i}' is a table, so we will check for its parquet files and create the table on Impala cluster.\n"
((num_tables++))
CURRENT_PRQ_FILE=`hdfs dfs -conf ${IMPALA_CONFIG_FILE} -ls -C "${IMPALA_HDFS_DB_BASE_PATH}/${db}.db/${i}/" | grep -v 'Found' | grep -v '_impala_insert_staging' | head -1`
if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside.
echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n"
exit 4 # Comment out when testing a DB which has such a table, just for performing this exact test-check.
else
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log
log_errors=`cat error.log | grep -E "WARN|ERROR|FAILED"`
@ -141,74 +137,73 @@ function copydb() {
fi
done
echo -e "\nAll tables have been created, going to create the views..\n"
previous_num_of_views_to_retry=${#all_create_view_statements[@]}
if [[ $num_tables -gt 0 ]]; then
echo -e "\nAll ${num_tables} tables have been created, for db '${db}', going to create the ${previous_num_of_views_to_retry} views..\n"
else
echo -e "\nDB '${db}' does not have any tables, moving on to create the ${previous_num_of_views_to_retry} views..\n"
fi
# Time to loop through the views and create them.
# At this point all table-schemas should have been created.
previous_num_of_views_to_retry=${#all_create_view_statements}
if [[ $previous_num_of_views_to_retry -gt 0 ]]; then
echo -e "\nAll_create_view_statements:\n\n${all_create_view_statements[@]}\n" # DEBUG
# Make Impala aware of the new tables, so it knows them when creating the views.
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
sleep 1
echo -e "\nAll_create_view_statements (${previous_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n" # DEBUG
else
echo -e "\nDB '${db}' does not contain any views.\n"
fi
level_counter=0
while [[ ${#all_create_view_statements[@]} -gt 0 ]]; do
while [[ $previous_num_of_views_to_retry -gt 0 ]]; do
((level_counter++))
# The only accepted reason for a view to not be created, is if it depends on another view, which has not been created yet.
# In this case, we should retry creating this particular view again.
should_retry_create_view_statements=()
new_num_of_views_to_retry=0
for create_view_statement in "${all_create_view_statements[@]}"; do # Here we use double quotes, as the elements are phrases, instead of single-words.
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "${create_view_statement}" |& tee error.log # impala-shell prints all logs in stderr, so wee need to capture them and put them in a file, in order to perform "grep" on them later
specific_errors=`cat error.log | grep -E "FAILED: ParseException line 1:13 missing TABLE at 'view'|ERROR: AnalysisException: Could not resolve table reference:"`
if [ -n "$specific_errors" ]; then
echo -e "\nspecific_errors: ${specific_errors}\n"
echo -e "\nView '$(cat error.log | grep "CREATE VIEW " | sed 's/CREATE VIEW //g' | sed 's/ as select .*//g')' failed to be created, possibly because it depends on another view.\n"
should_retry_create_view_statements+=("$create_view_statement")
echo -e "\nView '$(cat error.log | grep -Eo "Query: CREATE VIEW ([^\s]+)" | sed 's/Query: CREATE VIEW //g')' failed to be created, possibly because it depends on another view.\n"
((new_num_of_views_to_retry++)) # Increment it here, instead of acquiring the array's size in the end, as that doesn't work for some reason.
else
all_create_view_statements=("${all_create_view_statements[@]/$create_view_statement}") # Remove the current successful statement from the list.
sleep 1 # Wait a bit for Impala to register that the view was created, before possibly referencing it by another view.
fi
done
new_num_of_views_to_retry=${#should_retry_create_view_statements}
all_create_view_statements=("$(echo "${all_create_view_statements[@]}" | grep -v '^[\s]*$')") # Re-index the array, filtering-out any empty elements.
# Although the above command reduces the "active" elements to just the few to-be-retried, it does not manage to make the array return the its true size through the "${#all_create_view_statements[@]}" statement. So we use counters.
if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then
echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n"
return 3
exit 5
elif [[ $new_num_of_views_to_retry -gt 0 ]]; then
echo -e "\nTo be retried \"create_view_statements\":\n\n${should_retry_create_view_statements[@]}\n"
previous_num_of_views_to_retry=$new_num_of_views_to_retry
echo -e "\nTo be retried \"create_view_statements\" (${new_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n"
else
echo -e "\nFinished creating views, for db: '${db}', in level-${level_counter}.\n"
fi
all_create_view_statements=("${should_retry_create_view_statement[@]}") # This is needed in any case to either move forward with the rest of the views or stop at 0 remaining views.
previous_num_of_views_to_retry=$new_num_of_views_to_retry
done
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
sleep 1
echo -e "\nComputing stats for tables..\n"
entities_on_impala=`impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} --delimited -q "show tables in ${db}"`
for i in ${entities_on_impala[@]}; do # Use un-quoted values, as the elemetns are single-words.
# Taking the create table statement from the Ocean cluster, just to check if its a view, as the output is easier than using impala-shell from Impala cluster.
create_view_statement=`hive -e "show create table ${db}.${i};" | grep "CREATE VIEW"` # This grep works here, as we do not want to match multiple-lines.
if [ -z "$create_view_statement" ]; then # If it's a table, then go load the data to it.
# Invalidate metadata of this DB's tables, in order for Impala to be aware of all parquet files put inside the tables' directories, previously, by "hadoop distcp".
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA ${db}.${i}"
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "compute stats ${db}.${i}";
fi
done
# Check if the entities in both clusters are the same, down to the exact names, not just the counts. (they are sorted in the same way both in hive and impala)
if [ "${entities_on_impala[@]}" == "${entities_on_ocean[@]}" ]; then
echo -e "\nAll entities have been copied to Impala cluster.\n"
else
echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n"
rm -f error.log
return 4
exit 6
fi
rm -f error.log

View File

@ -66,24 +66,21 @@ function copydb() {
if [ -n "$log_errors" ]; then
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN DROPPING THE OLD DATABASE! EXITING...\n\n"
rm -f error.log
return 1
exit 2
fi
# Make Impala aware of the deletion of the old DB immediately.
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
echo -e "\n\nCopying files of '${db}', from Ocean to Impala cluster..\n"
# Using max-bandwidth of: 50 * 100 Mb/s = 5 Gb/s
# Using max memory of: 50 * 6144 = 300 Gb
# Using max-bandwidth of: 70 * 150 Mb/s = 10.5 Gb/s
# Using max memory of: 70 * 6144 = 430 Gb
# Using 1MB as a buffer-size.
# The " -Ddistcp.dynamic.recordsPerChunk=50" arg is not available in our version of hadoop
# The " -Ddistcp.dynamic.recordsPerChunk=N" arg is not available in our version of hadoop
# The "ug" args cannot be used as we get a "User does not belong to hive" error.
# The "p" argument cannot be used, as it blocks the files from being used, giving a "sticky bit"-error, even after applying chmod and chown onm the files.
hadoop distcp -Dmapreduce.map.memory.mb=6144 -m 70 -bandwidth 150 \
-numListstatusThreads 40 \
-copybuffersize 1048576 \
-strategy dynamic \
-blocksperchunk 8 \
-pb \
${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH}
@ -91,9 +88,9 @@ function copydb() {
if [ $? -eq 0 ]; then
echo -e "\nSuccessfully copied the files of '${db}'.\n"
else
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT WITH EXIT STATUS: $?\n\n"
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT EXIT STATUS: $?\n\n"
rm -f error.log
return 2
exit 3
fi
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the following costly operation as well..
@ -104,14 +101,11 @@ function copydb() {
# create the new database (with the same name)
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create database ${db}"
# Make Impala aware of the creation of the new DB immediately.
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
sleep 1
# Because "Hive" and "Impala" do not have compatible schemas, we cannot use the "show create table <name>" output from hive to create the exact same table in impala.
# So, we have to find at least one parquet file (check if it's there) from the table in the ocean cluster for impala to use it to extract the table-schema itself from that file.
all_create_view_statements=()
num_tables=0
entities_on_ocean=`hive -e "show tables in ${db};" | sed 's/WARN:.*//g'` # Get the tables and views without any potential the "WARN" logs.
for i in ${entities_on_ocean[@]}; do # Use un-quoted values, as the elemetns are single-words.
@ -128,9 +122,11 @@ function copydb() {
all_create_view_statements+=("$create_view_statement")
else
echo -e "\n'${i}' is a table, so we will check for its parquet files and create the table on Impala cluster.\n"
((num_tables++))
CURRENT_PRQ_FILE=`hdfs dfs -conf ${IMPALA_CONFIG_FILE} -ls -C "${IMPALA_HDFS_DB_BASE_PATH}/${db}.db/${i}/" | grep -v 'Found' | grep -v '_impala_insert_staging' | head -1`
if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside.
echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n"
exit 4 # Comment out when testing a DB which has such a table, just for performing this exact test-check.
else
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log
log_errors=`cat error.log | grep -E "WARN|ERROR|FAILED"`
@ -141,74 +137,73 @@ function copydb() {
fi
done
echo -e "\nAll tables have been created, going to create the views..\n"
previous_num_of_views_to_retry=${#all_create_view_statements[@]}
if [[ $num_tables -gt 0 ]]; then
echo -e "\nAll ${num_tables} tables have been created, for db '${db}', going to create the ${previous_num_of_views_to_retry} views..\n"
else
echo -e "\nDB '${db}' does not have any tables, moving on to create the ${previous_num_of_views_to_retry} views..\n"
fi
# Time to loop through the views and create them.
# At this point all table-schemas should have been created.
previous_num_of_views_to_retry=${#all_create_view_statements}
if [[ $previous_num_of_views_to_retry -gt 0 ]]; then
echo -e "\nAll_create_view_statements:\n\n${all_create_view_statements[@]}\n" # DEBUG
# Make Impala aware of the new tables, so it knows them when creating the views.
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
sleep 1
echo -e "\nAll_create_view_statements (${previous_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n" # DEBUG
else
echo -e "\nDB '${db}' does not contain any views.\n"
fi
level_counter=0
while [[ ${#all_create_view_statements[@]} -gt 0 ]]; do
while [[ $previous_num_of_views_to_retry -gt 0 ]]; do
((level_counter++))
# The only accepted reason for a view to not be created, is if it depends on another view, which has not been created yet.
# In this case, we should retry creating this particular view again.
should_retry_create_view_statements=()
new_num_of_views_to_retry=0
for create_view_statement in "${all_create_view_statements[@]}"; do # Here we use double quotes, as the elements are phrases, instead of single-words.
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "${create_view_statement}" |& tee error.log # impala-shell prints all logs in stderr, so wee need to capture them and put them in a file, in order to perform "grep" on them later
specific_errors=`cat error.log | grep -E "FAILED: ParseException line 1:13 missing TABLE at 'view'|ERROR: AnalysisException: Could not resolve table reference:"`
if [ -n "$specific_errors" ]; then
echo -e "\nspecific_errors: ${specific_errors}\n"
echo -e "\nView '$(cat error.log | grep "CREATE VIEW " | sed 's/CREATE VIEW //g' | sed 's/ as select .*//g')' failed to be created, possibly because it depends on another view.\n"
should_retry_create_view_statements+=("$create_view_statement")
echo -e "\nView '$(cat error.log | grep -Eo "Query: CREATE VIEW ([^\s]+)" | sed 's/Query: CREATE VIEW //g')' failed to be created, possibly because it depends on another view.\n"
((new_num_of_views_to_retry++)) # Increment it here, instead of acquiring the array's size in the end, as that doesn't work for some reason.
else
all_create_view_statements=("${all_create_view_statements[@]/$create_view_statement}") # Remove the current successful statement from the list.
sleep 1 # Wait a bit for Impala to register that the view was created, before possibly referencing it by another view.
fi
done
new_num_of_views_to_retry=${#should_retry_create_view_statements}
all_create_view_statements=("$(echo "${all_create_view_statements[@]}" | grep -v '^[\s]*$')") # Re-index the array, filtering-out any empty elements.
# Although the above command reduces the "active" elements to just the few to-be-retried, it does not manage to make the array return the its true size through the "${#all_create_view_statements[@]}" statement. So we use counters.
if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then
echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n"
return 3
exit 5
elif [[ $new_num_of_views_to_retry -gt 0 ]]; then
echo -e "\nTo be retried \"create_view_statements\":\n\n${should_retry_create_view_statements[@]}\n"
previous_num_of_views_to_retry=$new_num_of_views_to_retry
echo -e "\nTo be retried \"create_view_statements\" (${new_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n"
else
echo -e "\nFinished creating views, for db: '${db}', in level-${level_counter}.\n"
fi
all_create_view_statements=("${should_retry_create_view_statement[@]}") # This is needed in any case to either move forward with the rest of the views or stop at 0 remaining views.
previous_num_of_views_to_retry=$new_num_of_views_to_retry
done
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
sleep 1
echo -e "\nComputing stats for tables..\n"
entities_on_impala=`impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} --delimited -q "show tables in ${db}"`
for i in ${entities_on_impala[@]}; do # Use un-quoted values, as the elemetns are single-words.
# Taking the create table statement from the Ocean cluster, just to check if its a view, as the output is easier than using impala-shell from Impala cluster.
create_view_statement=`hive -e "show create table ${db}.${i};" | grep "CREATE VIEW"` # This grep works here, as we do not want to match multiple-lines.
if [ -z "$create_view_statement" ]; then # If it's a table, then go load the data to it.
# Invalidate metadata of this DB's tables, in order for Impala to be aware of all parquet files put inside the tables' directories, previously, by "hadoop distcp".
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA ${db}.${i}"
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "compute stats ${db}.${i}";
fi
done
# Check if the entities in both clusters are the same, down to the exact names, not just the counts. (they are sorted in the same way both in hive and impala)
if [ "${entities_on_impala[@]}" == "${entities_on_ocean[@]}" ]; then
echo -e "\nAll entities have been copied to Impala cluster.\n"
else
echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n"
rm -f error.log
return 4
exit 6
fi
rm -f error.log

View File

@ -68,24 +68,21 @@ function copydb() {
if [ -n "$log_errors" ]; then
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN DROPPING THE OLD DATABASE! EXITING...\n\n"
rm -f error.log
return 1
exit 2
fi
# Make Impala aware of the deletion of the old DB immediately.
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
echo -e "\n\nCopying files of '${db}', from Ocean to Impala cluster..\n"
# Using max-bandwidth of: 50 * 100 Mb/s = 5 Gb/s
# Using max memory of: 50 * 6144 = 300 Gb
# Using max-bandwidth of: 70 * 150 Mb/s = 10.5 Gb/s
# Using max memory of: 70 * 6144 = 430 Gb
# Using 1MB as a buffer-size.
# The " -Ddistcp.dynamic.recordsPerChunk=50" arg is not available in our version of hadoop
# The " -Ddistcp.dynamic.recordsPerChunk=N" arg is not available in our version of hadoop
# The "ug" args cannot be used as we get a "User does not belong to hive" error.
# The "p" argument cannot be used, as it blocks the files from being used, giving a "sticky bit"-error, even after applying chmod and chown onm the files.
hadoop distcp -Dmapreduce.map.memory.mb=6144 -m 70 -bandwidth 150 \
-numListstatusThreads 40 \
-copybuffersize 1048576 \
-strategy dynamic \
-blocksperchunk 8 \
-pb \
${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH}
@ -93,9 +90,9 @@ function copydb() {
if [ $? -eq 0 ]; then
echo -e "\nSuccessfully copied the files of '${db}'.\n"
else
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT WITH EXIT STATUS: $?\n\n"
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT EXIT STATUS: $?\n\n"
rm -f error.log
return 2
exit 3
fi
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the following costly operation as well..
@ -106,14 +103,11 @@ function copydb() {
# create the new database (with the same name)
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create database ${db}"
# Make Impala aware of the creation of the new DB immediately.
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
sleep 1
# Because "Hive" and "Impala" do not have compatible schemas, we cannot use the "show create table <name>" output from hive to create the exact same table in impala.
# So, we have to find at least one parquet file (check if it's there) from the table in the ocean cluster for impala to use it to extract the table-schema itself from that file.
all_create_view_statements=()
num_tables=0
entities_on_ocean=`hive -e "show tables in ${db};" | sed 's/WARN:.*//g'` # Get the tables and views without any potential the "WARN" logs.
for i in ${entities_on_ocean[@]}; do # Use un-quoted values, as the elemetns are single-words.
@ -130,9 +124,11 @@ function copydb() {
all_create_view_statements+=("$create_view_statement")
else
echo -e "\n'${i}' is a table, so we will check for its parquet files and create the table on Impala cluster.\n"
((num_tables++))
CURRENT_PRQ_FILE=`hdfs dfs -conf ${IMPALA_CONFIG_FILE} -ls -C "${IMPALA_HDFS_DB_BASE_PATH}/${db}.db/${i}/" | grep -v 'Found' | grep -v '_impala_insert_staging' | head -1`
if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside.
echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n"
exit 4 # Comment out when testing a DB which has such a table, just for performing this exact test-check.
else
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log
log_errors=`cat error.log | grep -E "WARN|ERROR|FAILED"`
@ -143,74 +139,73 @@ function copydb() {
fi
done
echo -e "\nAll tables have been created, going to create the views..\n"
previous_num_of_views_to_retry=${#all_create_view_statements[@]}
if [[ $num_tables -gt 0 ]]; then
echo -e "\nAll ${num_tables} tables have been created, for db '${db}', going to create the ${previous_num_of_views_to_retry} views..\n"
else
echo -e "\nDB '${db}' does not have any tables, moving on to create the ${previous_num_of_views_to_retry} views..\n"
fi
# Time to loop through the views and create them.
# At this point all table-schemas should have been created.
previous_num_of_views_to_retry=${#all_create_view_statements}
if [[ $previous_num_of_views_to_retry -gt 0 ]]; then
echo -e "\nAll_create_view_statements:\n\n${all_create_view_statements[@]}\n" # DEBUG
# Make Impala aware of the new tables, so it knows them when creating the views.
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
sleep 1
echo -e "\nAll_create_view_statements (${previous_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n" # DEBUG
else
echo -e "\nDB '${db}' does not contain any views.\n"
fi
level_counter=0
while [[ ${#all_create_view_statements[@]} -gt 0 ]]; do
while [[ $previous_num_of_views_to_retry -gt 0 ]]; do
((level_counter++))
# The only accepted reason for a view to not be created, is if it depends on another view, which has not been created yet.
# In this case, we should retry creating this particular view again.
should_retry_create_view_statements=()
new_num_of_views_to_retry=0
for create_view_statement in "${all_create_view_statements[@]}"; do # Here we use double quotes, as the elements are phrases, instead of single-words.
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "${create_view_statement}" |& tee error.log # impala-shell prints all logs in stderr, so wee need to capture them and put them in a file, in order to perform "grep" on them later
specific_errors=`cat error.log | grep -E "FAILED: ParseException line 1:13 missing TABLE at 'view'|ERROR: AnalysisException: Could not resolve table reference:"`
if [ -n "$specific_errors" ]; then
echo -e "\nspecific_errors: ${specific_errors}\n"
echo -e "\nView '$(cat error.log | grep "CREATE VIEW " | sed 's/CREATE VIEW //g' | sed 's/ as select .*//g')' failed to be created, possibly because it depends on another view.\n"
should_retry_create_view_statements+=("$create_view_statement")
echo -e "\nView '$(cat error.log | grep -Eo "Query: CREATE VIEW ([^\s]+)" | sed 's/Query: CREATE VIEW //g')' failed to be created, possibly because it depends on another view.\n"
((new_num_of_views_to_retry++)) # Increment it here, instead of acquiring the array's size in the end, as that doesn't work for some reason.
else
all_create_view_statements=("${all_create_view_statements[@]/$create_view_statement}") # Remove the current successful statement from the list.
sleep 1 # Wait a bit for Impala to register that the view was created, before possibly referencing it by another view.
fi
done
new_num_of_views_to_retry=${#should_retry_create_view_statements}
all_create_view_statements=("$(echo "${all_create_view_statements[@]}" | grep -v '^[\s]*$')") # Re-index the array, filtering-out any empty elements.
# Although the above command reduces the "active" elements to just the few to-be-retried, it does not manage to make the array return the its true size through the "${#all_create_view_statements[@]}" statement. So we use counters.
if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then
echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n"
return 3
exit 5
elif [[ $new_num_of_views_to_retry -gt 0 ]]; then
echo -e "\nTo be retried \"create_view_statements\":\n\n${should_retry_create_view_statements[@]}\n"
previous_num_of_views_to_retry=$new_num_of_views_to_retry
echo -e "\nTo be retried \"create_view_statements\" (${new_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n"
else
echo -e "\nFinished creating views, for db: '${db}', in level-${level_counter}.\n"
fi
all_create_view_statements=("${should_retry_create_view_statement[@]}") # This is needed in any case to either move forward with the rest of the views or stop at 0 remaining views.
previous_num_of_views_to_retry=$new_num_of_views_to_retry
done
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
sleep 1
echo -e "\nComputing stats for tables..\n"
entities_on_impala=`impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} --delimited -q "show tables in ${db}"`
for i in ${entities_on_impala[@]}; do # Use un-quoted values, as the elemetns are single-words.
# Taking the create table statement from the Ocean cluster, just to check if its a view, as the output is easier than using impala-shell from Impala cluster.
create_view_statement=`hive -e "show create table ${db}.${i};" | grep "CREATE VIEW"` # This grep works here, as we do not want to match multiple-lines.
if [ -z "$create_view_statement" ]; then # If it's a table, then go load the data to it.
# Invalidate metadata of this DB's tables, in order for Impala to be aware of all parquet files put inside the tables' directories, previously, by "hadoop distcp".
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA ${db}.${i}"
sleep 1
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "compute stats ${db}.${i}";
fi
done
# Check if the entities in both clusters are the same, down to the exact names, not just the counts. (they are sorted in the same way both in hive and impala)
if [ "${entities_on_impala[@]}" == "${entities_on_ocean[@]}" ]; then
echo -e "\nAll entities have been copied to Impala cluster.\n"
else
echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n"
rm -f error.log
return 4
exit 6
fi
rm -f error.log

View File

@ -1,3 +1,4 @@
set mapred.job.queue.name=analytics;
------------------------------------------------------
------------------------------------------------------
-- Additional relations

View File

@ -1,3 +1,5 @@
set mapred.job.queue.name=analytics;
------------------------------------------------------
------------------------------------------------------
-- Additional relations
@ -104,4 +106,4 @@ rel.properties[1].value apc_currency
from ${openaire_db_name}.relation rel
join ${openaire_db_name}.organization o on o.id=rel.source
join ${openaire_db_name}.result r on r.id=rel.target
where rel.subreltype = 'affiliation' and rel.datainfo.deletedbyinference = false and size(rel.properties)>0;
where rel.subreltype = 'affiliation' and rel.datainfo.deletedbyinference = false and size(rel.properties)>0;

View File

@ -1,3 +1,5 @@
set mapred.job.queue.name=analytics;
-------------------------------------------
--- Extra tables, mostly used by indicators
@ -63,4 +65,4 @@ from (
join ${stats_db_name}.result res on res.id=r.id
where r.amount is not null;
create or replace view ${stats_db_name}.issn_gold_oa_dataset as select * from ${external_stats_db_name}.issn_gold_oa_dataset;
create or replace view ${stats_db_name}.issn_gold_oa_dataset as select * from ${external_stats_db_name}.issn_gold_oa_dataset;

View File

@ -249,7 +249,7 @@ create table if not exists ${stats_db_name}.indi_pub_gold_oa stored as parquet a
left semi join dd on dd.id=pd.datasource
union all
select ra.id, 1 as is_gold
from ${stats_db_name}.result_accessroute ra on ra.id = pd.id where ra.accessroute = 'gold') tmp on tmp.id=pd.id; /*EOS*/
from ${stats_db_name}.result_accessroute ra where ra.accessroute = 'gold') tmp on tmp.id=pd.id; /*EOS*/
drop table if exists ${stats_db_name}.indi_pub_hybrid_oa_with_cc purge; /*EOS*/
create table if not exists ${stats_db_name}.indi_pub_hybrid_oa_with_cc stored as parquet as
@ -294,7 +294,7 @@ left outer join (
join ${stats_db_name}.indi_pub_gold_oa indi_gold on indi_gold.id=p.id
left outer join ${stats_db_name}.result_accessroute ra on ra.id=p.id
where indi_gold.is_gold=0 and
((d.type like '%Journal%' and ri.accessright not in ('Closed Access', 'Restricted', 'Not Available') and ri.license is not null) or ra.accessroute='hybrid')) tmp on pd.i=tmp.id; /*EOS*/
((d.type like '%Journal%' and ri.accessright not in ('Closed Access', 'Restricted', 'Not Available') and ri.license is not null) or ra.accessroute='hybrid')) tmp on p.id=tmp.id; /*EOS*/
drop table if exists ${stats_db_name}.indi_org_fairness purge; /*EOS*/
create table if not exists ${stats_db_name}.indi_org_fairness stored as parquet as
@ -380,7 +380,7 @@ CREATE TEMPORARY VIEW allresults as
drop table if exists ${stats_db_name}.indi_org_fairness_pub purge; /*EOS*/
create table if not exists ${stats_db_name}.indi_org_fairness_pub as
create table if not exists ${stats_db_name}.indi_org_fairness_pub stored as parquet as
select ar.organization, rf.no_result_fair/ar.no_allresults org_fairness
from allresults ar join result_fair rf
on rf.organization=ar.organization; /*EOS*/
@ -639,7 +639,7 @@ from ${stats_db_name}.publication p
drop table if exists ${stats_db_name}.indi_result_with_pid purge; /*EOS*/
create table if not exists ${stats_db_name}.indi_result_with_pid as
create table if not exists ${stats_db_name}.indi_result_with_pid stored as parquet as
select distinct p.id, coalesce(result_with_pid, 0) as result_with_pid
from ${stats_db_name}.result p
left outer join (
@ -653,7 +653,7 @@ group by rf.id; /*EOS*/
drop table if exists ${stats_db_name}.indi_pub_interdisciplinarity purge; /*EOS*/
create table if not exists ${stats_db_name}.indi_pub_interdisciplinarity as
create table if not exists ${stats_db_name}.indi_pub_interdisciplinarity stored as parquet as
select distinct p.id as id, coalesce(is_interdisciplinary, 0)
as is_interdisciplinary
from pub_fos_totals p
@ -1006,14 +1006,14 @@ left outer join (
drop table if exists ${stats_db_name}.result_country purge; /*EOS*/
create table ${stats_db_name}.result_country stored as parquet as
select distinct *
select distinct id, country
from (
select ro.id, o.country
from ${stats_db_name}.result_organization ro
left outer join ${stats_db_name}.organization o on o.id=ro.organization
union all
select rp.id, f.country
from ${stats_db_name}.result_projects
from ${stats_db_name}.result_projects rp
left outer join ${stats_db_name}.project p on p.id=rp.project
left outer join ${stats_db_name}.funder f on f.name=p.funder
) rc

View File

@ -1,3 +1,5 @@
set mapred.job.queue.name=analytics;
----------------------------------------------------
-- Shortcuts for various definitions in stats db ---
----------------------------------------------------
@ -25,4 +27,4 @@ drop table if exists ${stats_db_name}.result_gold purge;
create table IF NOT EXISTS ${stats_db_name}.result_gold STORED AS PARQUET as
select r.id, case when gold.is_gold=1 then true else false end as gold
from ${stats_db_name}.result r
left outer join ${stats_db_name}.indi_pub_gold_oa gold on gold.id=r.id;
left outer join ${stats_db_name}.indi_pub_gold_oa gold on gold.id=r.id;

View File

@ -1,3 +1,5 @@
set mapred.job.queue.name=analytics;
-- replace the creation of the result view to include the boolean fields from the previous tables (green, gold,
-- peer reviewed)
drop table if exists ${stats_db_name}.result_tmp;
@ -53,4 +55,4 @@ LEFT OUTER JOIN ${stats_db_name}.result_gold gold on gold.id=r.id;
drop table if exists ${stats_db_name}.result;
drop view if exists ${stats_db_name}.result;
create table ${stats_db_name}.result stored as parquet as select * from ${stats_db_name}.result_tmp;
drop table ${stats_db_name}.result_tmp;
drop table ${stats_db_name}.result_tmp;

View File

@ -1,3 +1,5 @@
set mapred.job.queue.name=analytics;
--------------------------------------------------------------
--------------------------------------------------------------
-- Publication table/view and Publication related tables/views
@ -111,4 +113,4 @@ SELECT substr(p.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type=
FROM ${openaire_db_name}.publication p
lateral view explode(p.extrainfo) citations AS citation
WHERE xpath_string(citation.value, "//citation/id[@type='openaire']/@value") != ""
and p.datainfo.deletedbyinference = false and p.datainfo.invisible=false;
and p.datainfo.deletedbyinference = false and p.datainfo.invisible=false;

View File

@ -368,6 +368,7 @@
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
--queue analytics
</spark-opts>
<arg>--hiveMetastoreUris</arg><arg>${hive_metastore_uris}</arg>
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16-createIndicatorsTables.sql</arg>
@ -551,4 +552,4 @@
</action>
<end name="End"/>
</workflow-app>
</workflow-app>

View File

@ -30,6 +30,10 @@
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx16g</value>
</property>
</configuration>
</global>