forked from D-Net/dnet-hadoop
mergin with branch beta
This commit is contained in:
commit
9eeb9f5d32
|
@ -92,6 +92,8 @@ public class GraphCleaningFunctions extends CleaningFunctions {
|
||||||
INVALID_AUTHOR_NAMES.add("null anonymous");
|
INVALID_AUTHOR_NAMES.add("null anonymous");
|
||||||
INVALID_AUTHOR_NAMES.add("unbekannt");
|
INVALID_AUTHOR_NAMES.add("unbekannt");
|
||||||
INVALID_AUTHOR_NAMES.add("unknown");
|
INVALID_AUTHOR_NAMES.add("unknown");
|
||||||
|
INVALID_AUTHOR_NAMES.add("autor, Sin");
|
||||||
|
INVALID_AUTHOR_NAMES.add("Desconocido / Inconnu,");
|
||||||
|
|
||||||
INVALID_URL_HOSTS.add("creativecommons.org");
|
INVALID_URL_HOSTS.add("creativecommons.org");
|
||||||
INVALID_URL_HOSTS.add("www.academia.edu");
|
INVALID_URL_HOSTS.add("www.academia.edu");
|
||||||
|
|
|
@ -497,9 +497,14 @@ public class MergeUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Field<String> selectOldestDate(Field<String> d1, Field<String> d2) {
|
private static Field<String> selectOldestDate(Field<String> d1, Field<String> d2) {
|
||||||
|
if (d1 == null || StringUtils.isBlank(d1.getValue())) {
|
||||||
|
return d2;
|
||||||
|
} else if (d2 == null || StringUtils.isBlank(d2.getValue())) {
|
||||||
|
return d1;
|
||||||
|
}
|
||||||
|
|
||||||
return Stream
|
return Stream
|
||||||
.of(d1, d2)
|
.of(d1, d2)
|
||||||
.filter(Objects::nonNull)
|
|
||||||
.min(
|
.min(
|
||||||
Comparator
|
Comparator
|
||||||
.comparing(
|
.comparing(
|
||||||
|
|
|
@ -1,16 +1,18 @@
|
||||||
|
|
||||||
package eu.dnetlib.pace.tree;
|
package eu.dnetlib.pace.tree;
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import com.wcohen.ss.AbstractStringDistance;
|
import com.wcohen.ss.AbstractStringDistance;
|
||||||
|
|
||||||
import eu.dnetlib.pace.config.Config;
|
import eu.dnetlib.pace.config.Config;
|
||||||
import eu.dnetlib.pace.model.Person;
|
import eu.dnetlib.pace.model.Person;
|
||||||
import eu.dnetlib.pace.tree.support.AbstractListComparator;
|
import eu.dnetlib.pace.tree.support.AbstractListComparator;
|
||||||
import eu.dnetlib.pace.tree.support.ComparatorClass;
|
import eu.dnetlib.pace.tree.support.ComparatorClass;
|
||||||
|
import eu.dnetlib.pace.util.AuthorMatchers;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@ComparatorClass("authorsMatch")
|
@ComparatorClass("authorsMatch")
|
||||||
public class AuthorsMatch extends AbstractListComparator {
|
public class AuthorsMatch extends AbstractListComparator {
|
||||||
|
@ -41,24 +43,36 @@ public class AuthorsMatch extends AbstractListComparator {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public double compare(final List<String> a, final List<String> b, final Config conf) {
|
public double compare(final List<String> left, final List<String> right, final Config conf) {
|
||||||
if (a.isEmpty() || b.isEmpty())
|
if (left.isEmpty() || right.isEmpty())
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
if (a.size() > SIZE_THRESHOLD || b.size() > SIZE_THRESHOLD)
|
if (left.size() > SIZE_THRESHOLD || right.size() > SIZE_THRESHOLD)
|
||||||
return 1.0;
|
return 1.0;
|
||||||
|
|
||||||
int maxMiss = Integer.MAX_VALUE;
|
|
||||||
List<Person> bList = b.stream().map(author -> new Person(author, false)).collect(Collectors.toList());
|
|
||||||
|
|
||||||
Double threshold = getDoubleParam("threshold");
|
Double threshold = getDoubleParam("threshold");
|
||||||
|
int maxMiss = Integer.MAX_VALUE;
|
||||||
|
|
||||||
if (threshold != null && threshold >= 0.0 && threshold <= 1.0 && a.size() == b.size()) {
|
if (threshold != null && threshold >= 0.0 && threshold <= 1.0 && left.size() == right.size()) {
|
||||||
maxMiss = (int) Math.floor((1 - threshold) * Math.max(a.size(), b.size()));
|
maxMiss = (int) Math.floor((1 - threshold) * Math.max(left.size(), right.size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
int common = 0;
|
int common = 0;
|
||||||
|
|
||||||
|
List<String> a = new ArrayList<>(left);
|
||||||
|
List<String> b = new ArrayList<>(right);
|
||||||
|
|
||||||
|
common += AuthorMatchers
|
||||||
|
.removeMatches(a, b, (BiFunction<String, String, Object>) AuthorMatchers::matchEqualsIgnoreCase)
|
||||||
|
.size() / 2;
|
||||||
|
common += AuthorMatchers
|
||||||
|
.removeMatches(a, b, (BiFunction<String, String, Object>) AuthorMatchers::matchOrderedTokenAndAbbreviations)
|
||||||
|
.size() / 2;
|
||||||
|
|
||||||
|
List<Person> bList = b.stream().map(author -> new Person(author, false)).collect(Collectors.toList());
|
||||||
|
|
||||||
// compare each element of List1 with each element of List2
|
// compare each element of List1 with each element of List2
|
||||||
|
int alreadyMatched = common;
|
||||||
for (int i = 0; i < a.size(); i++) {
|
for (int i = 0; i < a.size(); i++) {
|
||||||
Person p1 = new Person(a.get(i), false);
|
Person p1 = new Person(a.get(i), false);
|
||||||
|
|
||||||
|
@ -123,13 +137,13 @@ public class AuthorsMatch extends AbstractListComparator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (i - common > maxMiss) {
|
if (i - common - alreadyMatched > maxMiss) {
|
||||||
return 0.0;
|
return 0.0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// normalization factor to compute the score
|
// normalization factor to compute the score
|
||||||
int normFactor = a.size() == b.size() ? a.size() : (a.size() + b.size() - common);
|
int normFactor = left.size() == right.size() ? left.size() : (left.size() + right.size() - common);
|
||||||
|
|
||||||
if (TYPE.equals("percentage")) {
|
if (TYPE.equals("percentage")) {
|
||||||
return (double) common / normFactor;
|
return (double) common / normFactor;
|
||||||
|
@ -160,5 +174,4 @@ public class AuthorsMatch extends AbstractListComparator {
|
||||||
public String normalization(String s) {
|
public String normalization(String s) {
|
||||||
return normalize(utf8(cleanup(s)));
|
return normalize(utf8(cleanup(s)));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
package eu.dnetlib.dhp.enrich.orcid
|
package eu.dnetlib.pace.util
|
||||||
|
|
||||||
import java.util.Locale
|
import java.util.Locale
|
||||||
import java.util.regex.Pattern
|
import java.util.regex.Pattern
|
||||||
|
import scala.util.control.Breaks.{break, breakable}
|
||||||
|
|
||||||
object ORCIDAuthorMatchers {
|
object AuthorMatchers {
|
||||||
val SPLIT_REGEX = Pattern.compile("[\\s,\\.]+")
|
val SPLIT_REGEX = Pattern.compile("[\\s,\\.]+")
|
||||||
|
|
||||||
val WORD_DIFF = 2
|
val WORD_DIFF = 2
|
||||||
|
@ -45,6 +46,7 @@ object ORCIDAuthorMatchers {
|
||||||
var res: Boolean = false
|
var res: Boolean = false
|
||||||
if (e1.length != 1 && e2.length != 1) {
|
if (e1.length != 1 && e2.length != 1) {
|
||||||
res = e1 == e2
|
res = e1 == e2
|
||||||
|
if (res)
|
||||||
longMatches += 1
|
longMatches += 1
|
||||||
} else {
|
} else {
|
||||||
res = true
|
res = true
|
||||||
|
@ -62,4 +64,49 @@ object ORCIDAuthorMatchers {
|
||||||
}
|
}
|
||||||
longMatches > 0 && (shortMatches + longMatches) == Math.min(p1.length, p2.length)
|
longMatches > 0 && (shortMatches + longMatches) == Math.min(p1.length, p2.length)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def removeMatches(
|
||||||
|
graph_authors: java.util.List[String],
|
||||||
|
orcid_authors: java.util.List[String],
|
||||||
|
matchingFunc: java.util.function.BiFunction[String,String,Boolean]
|
||||||
|
) : java.util.List[String] = {
|
||||||
|
removeMatches(graph_authors, orcid_authors, (a, b) => matchingFunc(a,b))
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def removeMatches(
|
||||||
|
graph_authors: java.util.List[String],
|
||||||
|
orcid_authors: java.util.List[String],
|
||||||
|
matchingFunc: (String, String) => Boolean
|
||||||
|
) : java.util.List[String] = {
|
||||||
|
val matched = new java.util.ArrayList[String]()
|
||||||
|
|
||||||
|
if (graph_authors != null && !graph_authors.isEmpty) {
|
||||||
|
val ait = graph_authors.iterator
|
||||||
|
|
||||||
|
while (ait.hasNext) {
|
||||||
|
val author = ait.next()
|
||||||
|
val oit = orcid_authors.iterator
|
||||||
|
|
||||||
|
breakable {
|
||||||
|
while (oit.hasNext) {
|
||||||
|
val orcid = oit.next()
|
||||||
|
|
||||||
|
if (matchingFunc(author, orcid)) {
|
||||||
|
ait.remove()
|
||||||
|
oit.remove()
|
||||||
|
|
||||||
|
matched.add(author)
|
||||||
|
matched.add(orcid)
|
||||||
|
|
||||||
|
break()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
matched
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,84 @@
|
||||||
|
package eu.dnetlib.dhp.collection.plugin.rest;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Disabled;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||||
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
|
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
||||||
|
|
||||||
|
public class OsfPreprintCollectorTest {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(OsfPreprintCollectorTest.class);
|
||||||
|
|
||||||
|
private final String baseUrl = "https://api.osf.io/v2/preprints/";
|
||||||
|
|
||||||
|
// private final String requestHeaderMap = "";
|
||||||
|
// private final String authMethod = "";
|
||||||
|
// private final String authToken = "";
|
||||||
|
// private final String resultOutputFormat = "";
|
||||||
|
|
||||||
|
private final String queryParams = "filter:is_published:d=true";
|
||||||
|
|
||||||
|
private final String entityXpath = "/*/*[local-name()='data']";
|
||||||
|
|
||||||
|
private final String resultTotalXpath = "/*/*[local-name()='links']/*[local-name()='meta']/*[local-name()='total']";
|
||||||
|
|
||||||
|
private final String resumptionParam = "page";
|
||||||
|
private final String resumptionType = "page";
|
||||||
|
private final String resumptionXpath = "/*/*[local-name()='links']/*[local-name()='next']";
|
||||||
|
|
||||||
|
private final String resultSizeParam = "";
|
||||||
|
private final String resultSizeValue = "";
|
||||||
|
|
||||||
|
private final String resultFormatParam = "format";
|
||||||
|
private final String resultFormatValue = "json";
|
||||||
|
|
||||||
|
private final ApiDescriptor api = new ApiDescriptor();
|
||||||
|
private RestCollectorPlugin rcp;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() {
|
||||||
|
final HashMap<String, String> params = new HashMap<>();
|
||||||
|
params.put("resumptionType", this.resumptionType);
|
||||||
|
params.put("resumptionParam", this.resumptionParam);
|
||||||
|
params.put("resumptionXpath", this.resumptionXpath);
|
||||||
|
params.put("resultTotalXpath", this.resultTotalXpath);
|
||||||
|
params.put("resultFormatParam", this.resultFormatParam);
|
||||||
|
params.put("resultFormatValue", this.resultFormatValue);
|
||||||
|
params.put("resultSizeParam", this.resultSizeParam);
|
||||||
|
params.put("resultSizeValue", this.resultSizeValue);
|
||||||
|
params.put("queryParams", this.queryParams);
|
||||||
|
params.put("entityXpath", this.entityXpath);
|
||||||
|
|
||||||
|
this.api.setBaseUrl(this.baseUrl);
|
||||||
|
this.api.setParams(params);
|
||||||
|
|
||||||
|
this.rcp = new RestCollectorPlugin(new HttpClientParams());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Disabled
|
||||||
|
void test() throws CollectorException {
|
||||||
|
final AtomicInteger i = new AtomicInteger(0);
|
||||||
|
final Stream<String> stream = this.rcp.collect(this.api, new AggregatorReport());
|
||||||
|
|
||||||
|
stream.limit(200).forEach(s -> {
|
||||||
|
Assertions.assertTrue(s.length() > 0);
|
||||||
|
i.incrementAndGet();
|
||||||
|
log.info(s);
|
||||||
|
});
|
||||||
|
|
||||||
|
log.info("{}", i.intValue());
|
||||||
|
Assertions.assertTrue(i.intValue() > 0);
|
||||||
|
}
|
||||||
|
}
|
|
@ -43,15 +43,13 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
|
||||||
import eu.dnetlib.dhp.schema.sx.OafUtils;
|
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||||
public class SparkDedupTest implements Serializable {
|
public class SparkDedupTest implements Serializable {
|
||||||
|
static final boolean CHECK_CARDINALITIES = true;
|
||||||
|
|
||||||
@Mock(serializable = true)
|
@Mock(serializable = true)
|
||||||
ISLookUpService isLookUpService;
|
ISLookUpService isLookUpService;
|
||||||
|
@ -191,12 +189,13 @@ public class SparkDedupTest implements Serializable {
|
||||||
System.out.println("ds_simrel = " + ds_simrel);
|
System.out.println("ds_simrel = " + ds_simrel);
|
||||||
System.out.println("orp_simrel = " + orp_simrel);
|
System.out.println("orp_simrel = " + orp_simrel);
|
||||||
|
|
||||||
|
if (CHECK_CARDINALITIES) {
|
||||||
assertEquals(751, orgs_simrel);
|
assertEquals(751, orgs_simrel);
|
||||||
assertEquals(546, pubs_simrel);
|
assertEquals(566, pubs_simrel);
|
||||||
assertEquals(113, sw_simrel);
|
assertEquals(113, sw_simrel);
|
||||||
assertEquals(148, ds_simrel);
|
assertEquals(148, ds_simrel);
|
||||||
assertEquals(280, orp_simrel);
|
assertEquals(280, orp_simrel);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -239,21 +238,27 @@ public class SparkDedupTest implements Serializable {
|
||||||
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct"))
|
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct"))
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
// entities simrels supposed to be equal to the number of previous step (no rels in whitelist)
|
|
||||||
assertEquals(751, orgs_simrel);
|
|
||||||
assertEquals(546, pubs_simrel);
|
|
||||||
assertEquals(148, ds_simrel);
|
|
||||||
assertEquals(280, orp_simrel);
|
|
||||||
// System.out.println("orgs_simrel = " + orgs_simrel);
|
|
||||||
// System.out.println("pubs_simrel = " + pubs_simrel);
|
|
||||||
// System.out.println("ds_simrel = " + ds_simrel);
|
|
||||||
// System.out.println("orp_simrel = " + orp_simrel);
|
|
||||||
|
|
||||||
// entities simrels to be different from the number of previous step (new simrels in the whitelist)
|
// entities simrels to be different from the number of previous step (new simrels in the whitelist)
|
||||||
Dataset<Row> sw_simrel = spark
|
Dataset<Row> sw_simrel = spark
|
||||||
.read()
|
.read()
|
||||||
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software"));
|
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software"));
|
||||||
|
|
||||||
|
System.out.println("orgs_simrel = " + orgs_simrel);
|
||||||
|
System.out.println("pubs_simrel = " + pubs_simrel);
|
||||||
|
System.out.println("ds_simrel = " + ds_simrel);
|
||||||
|
System.out.println("orp_simrel = " + orp_simrel);
|
||||||
|
System.out.println("sw_simrel = " + sw_simrel.count());
|
||||||
|
|
||||||
|
// entities simrels supposed to be equal to the number of previous step (no rels in whitelist)
|
||||||
|
if (CHECK_CARDINALITIES) {
|
||||||
|
assertEquals(751, orgs_simrel);
|
||||||
|
assertEquals(566, pubs_simrel);
|
||||||
|
assertEquals(148, ds_simrel);
|
||||||
|
assertEquals(280, orp_simrel);
|
||||||
|
assertEquals(115, sw_simrel.count());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// check if the first relation in the whitelist exists
|
// check if the first relation in the whitelist exists
|
||||||
assertTrue(
|
assertTrue(
|
||||||
sw_simrel
|
sw_simrel
|
||||||
|
@ -272,10 +277,6 @@ public class SparkDedupTest implements Serializable {
|
||||||
rel -> rel.getSource().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[0])
|
rel -> rel.getSource().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[0])
|
||||||
&& rel.getTarget().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[1]))
|
&& rel.getTarget().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[1]))
|
||||||
.count() > 0);
|
.count() > 0);
|
||||||
|
|
||||||
assertEquals(115, sw_simrel.count());
|
|
||||||
// System.out.println("sw_simrel = " + sw_simrel.count());
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -466,17 +467,19 @@ public class SparkDedupTest implements Serializable {
|
||||||
assertTrue(dups.contains(r.getSource()));
|
assertTrue(dups.contains(r.getSource()));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
System.out.println("orgs_mergerel = " + orgs_mergerel);
|
||||||
|
System.out.println("pubs_mergerel = " + pubs.count());
|
||||||
|
System.out.println("sw_mergerel = " + sw_mergerel);
|
||||||
|
System.out.println("ds_mergerel = " + ds_mergerel);
|
||||||
|
System.out.println("orp_mergerel = " + orp_mergerel);
|
||||||
|
|
||||||
|
if (CHECK_CARDINALITIES) {
|
||||||
assertEquals(1268, orgs_mergerel);
|
assertEquals(1268, orgs_mergerel);
|
||||||
assertEquals(1112, pubs.count());
|
assertEquals(1156, pubs.count());
|
||||||
assertEquals(292, sw_mergerel);
|
assertEquals(292, sw_mergerel);
|
||||||
assertEquals(476, ds_mergerel);
|
assertEquals(476, ds_mergerel);
|
||||||
assertEquals(742, orp_mergerel);
|
assertEquals(742, orp_mergerel);
|
||||||
// System.out.println("orgs_mergerel = " + orgs_mergerel);
|
}
|
||||||
// System.out.println("pubs_mergerel = " + pubs_mergerel);
|
|
||||||
// System.out.println("sw_mergerel = " + sw_mergerel);
|
|
||||||
// System.out.println("ds_mergerel = " + ds_mergerel);
|
|
||||||
// System.out.println("orp_mergerel = " + orp_mergerel);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -552,17 +555,19 @@ public class SparkDedupTest implements Serializable {
|
||||||
assertTrue(dups.contains(r.getSource()));
|
assertTrue(dups.contains(r.getSource()));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
System.out.println("orgs_mergerel = " + orgs_mergerel);
|
||||||
|
System.out.println("pubs_mergerel = " + pubs.count());
|
||||||
|
System.out.println("sw_mergerel = " + sw_mergerel);
|
||||||
|
System.out.println("ds_mergerel = " + ds_mergerel);
|
||||||
|
System.out.println("orp_mergerel = " + orp_mergerel);
|
||||||
|
|
||||||
|
if (CHECK_CARDINALITIES) {
|
||||||
assertEquals(1268, orgs_mergerel);
|
assertEquals(1268, orgs_mergerel);
|
||||||
assertEquals(1112, pubs.count());
|
assertEquals(1156, pubs.count());
|
||||||
assertEquals(292, sw_mergerel);
|
assertEquals(292, sw_mergerel);
|
||||||
assertEquals(476, ds_mergerel);
|
assertEquals(476, ds_mergerel);
|
||||||
assertEquals(742, orp_mergerel);
|
assertEquals(742, orp_mergerel);
|
||||||
// System.out.println("orgs_mergerel = " + orgs_mergerel);
|
}
|
||||||
// System.out.println("pubs_mergerel = " + pubs_mergerel);
|
|
||||||
// System.out.println("sw_mergerel = " + sw_mergerel);
|
|
||||||
// System.out.println("ds_mergerel = " + ds_mergerel);
|
|
||||||
// System.out.println("orp_mergerel = " + orp_mergerel);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -607,19 +612,21 @@ public class SparkDedupTest implements Serializable {
|
||||||
testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord")
|
testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord")
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
|
System.out.println("orgs_deduprecord = " + orgs_deduprecord);
|
||||||
|
System.out.println("pubs_deduprecord = " + pubs.count());
|
||||||
|
System.out.println("sw_deduprecord = " + sw_deduprecord);
|
||||||
|
System.out.println("ds_deduprecord = " + ds_deduprecord);
|
||||||
|
System.out.println("orp_deduprecord = " + orp_deduprecord);
|
||||||
|
|
||||||
|
if (CHECK_CARDINALITIES) {
|
||||||
assertEquals(86, orgs_deduprecord);
|
assertEquals(86, orgs_deduprecord);
|
||||||
assertEquals(91, pubs.count());
|
assertEquals(96, pubs.count());
|
||||||
assertEquals(47, sw_deduprecord);
|
assertEquals(47, sw_deduprecord);
|
||||||
assertEquals(97, ds_deduprecord);
|
assertEquals(97, ds_deduprecord);
|
||||||
assertEquals(92, orp_deduprecord);
|
assertEquals(92, orp_deduprecord);
|
||||||
|
}
|
||||||
|
|
||||||
verifyRoot_1(mapper, pubs);
|
verifyRoot_1(mapper, pubs);
|
||||||
|
|
||||||
// System.out.println("orgs_deduprecord = " + orgs_deduprecord);
|
|
||||||
// System.out.println("pubs_deduprecord = " + pubs_deduprecord);
|
|
||||||
// System.out.println("sw_deduprecord = " + sw_deduprecord);
|
|
||||||
// System.out.println("ds_deduprecord = " + ds_deduprecord);
|
|
||||||
// System.out.println("orp_deduprecord = " + orp_deduprecord);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void verifyRoot_1(ObjectMapper mapper, Dataset<Publication> pubs) {
|
private static void verifyRoot_1(ObjectMapper mapper, Dataset<Publication> pubs) {
|
||||||
|
@ -745,21 +752,23 @@ public class SparkDedupTest implements Serializable {
|
||||||
.distinct()
|
.distinct()
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
assertEquals(925, publications);
|
System.out.println("publications = " + publications);
|
||||||
|
System.out.println("organizations = " + organizations);
|
||||||
|
System.out.println("projects = " + projects);
|
||||||
|
System.out.println("datasource = " + datasource);
|
||||||
|
System.out.println("software = " + softwares);
|
||||||
|
System.out.println("dataset = " + dataset);
|
||||||
|
System.out.println("otherresearchproduct = " + otherresearchproduct);
|
||||||
|
|
||||||
|
if (CHECK_CARDINALITIES) {
|
||||||
|
assertEquals(930, publications);
|
||||||
assertEquals(839, organizations);
|
assertEquals(839, organizations);
|
||||||
assertEquals(100, projects);
|
assertEquals(100, projects);
|
||||||
assertEquals(100, datasource);
|
assertEquals(100, datasource);
|
||||||
assertEquals(196, softwares);
|
assertEquals(196, softwares);
|
||||||
assertEquals(389, dataset);
|
assertEquals(389, dataset);
|
||||||
assertEquals(520, otherresearchproduct);
|
assertEquals(520, otherresearchproduct);
|
||||||
|
}
|
||||||
// System.out.println("publications = " + publications);
|
|
||||||
// System.out.println("organizations = " + organizations);
|
|
||||||
// System.out.println("projects = " + projects);
|
|
||||||
// System.out.println("datasource = " + datasource);
|
|
||||||
// System.out.println("software = " + softwares);
|
|
||||||
// System.out.println("dataset = " + dataset);
|
|
||||||
// System.out.println("otherresearchproduct = " + otherresearchproduct);
|
|
||||||
|
|
||||||
long deletedOrgs = jsc
|
long deletedOrgs = jsc
|
||||||
.textFile(testDedupGraphBasePath + "/organization")
|
.textFile(testDedupGraphBasePath + "/organization")
|
||||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp.enrich.orcid
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Author, StructuredProperty}
|
import eu.dnetlib.dhp.schema.oaf.{Author, StructuredProperty}
|
||||||
import eu.dnetlib.dhp.schema.sx.OafUtils
|
import eu.dnetlib.dhp.schema.sx.OafUtils
|
||||||
|
import eu.dnetlib.pace.util.AuthorMatchers
|
||||||
|
|
||||||
import java.util
|
import java.util
|
||||||
import scala.beans.BeanProperty
|
import scala.beans.BeanProperty
|
||||||
|
@ -39,7 +40,7 @@ object ORCIDAuthorEnricher extends Serializable {
|
||||||
unmatched_authors,
|
unmatched_authors,
|
||||||
orcid_authors,
|
orcid_authors,
|
||||||
(author, orcid) =>
|
(author, orcid) =>
|
||||||
ORCIDAuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||||
"fullName"
|
"fullName"
|
||||||
) ++
|
) ++
|
||||||
// Look after exact reversed fullname match, reconstruct ORCID fullname as familyName + givenName
|
// Look after exact reversed fullname match, reconstruct ORCID fullname as familyName + givenName
|
||||||
|
@ -47,7 +48,7 @@ object ORCIDAuthorEnricher extends Serializable {
|
||||||
unmatched_authors,
|
unmatched_authors,
|
||||||
orcid_authors,
|
orcid_authors,
|
||||||
(author, orcid) =>
|
(author, orcid) =>
|
||||||
ORCIDAuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.familyName + " " + orcid.givenName),
|
AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.familyName + " " + orcid.givenName),
|
||||||
"reversedFullName"
|
"reversedFullName"
|
||||||
) ++
|
) ++
|
||||||
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
|
// split author names in tokens, order the tokens, then check for matches of full tokens or abbreviations
|
||||||
|
@ -55,7 +56,7 @@ object ORCIDAuthorEnricher extends Serializable {
|
||||||
unmatched_authors,
|
unmatched_authors,
|
||||||
orcid_authors,
|
orcid_authors,
|
||||||
(author, orcid) =>
|
(author, orcid) =>
|
||||||
ORCIDAuthorMatchers
|
AuthorMatchers
|
||||||
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
.matchOrderedTokenAndAbbreviations(author.getFullname, orcid.givenName + " " + orcid.familyName),
|
||||||
"orderedTokens"
|
"orderedTokens"
|
||||||
) ++
|
) ++
|
||||||
|
@ -63,7 +64,7 @@ object ORCIDAuthorEnricher extends Serializable {
|
||||||
extractAndEnrichMatches(
|
extractAndEnrichMatches(
|
||||||
unmatched_authors,
|
unmatched_authors,
|
||||||
orcid_authors,
|
orcid_authors,
|
||||||
(author, orcid) => ORCIDAuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.creditName),
|
(author, orcid) => AuthorMatchers.matchEqualsIgnoreCase(author.getFullname, orcid.creditName),
|
||||||
"creditName"
|
"creditName"
|
||||||
) ++
|
) ++
|
||||||
// look after exact matches in ORCID otherNames
|
// look after exact matches in ORCID otherNames
|
||||||
|
@ -71,7 +72,7 @@ object ORCIDAuthorEnricher extends Serializable {
|
||||||
unmatched_authors,
|
unmatched_authors,
|
||||||
orcid_authors,
|
orcid_authors,
|
||||||
(author, orcid) =>
|
(author, orcid) =>
|
||||||
orcid.otherNames != null && ORCIDAuthorMatchers.matchOtherNames(author.getFullname, orcid.otherNames.asScala),
|
orcid.otherNames != null && AuthorMatchers.matchOtherNames(author.getFullname, orcid.otherNames.asScala),
|
||||||
"otherNames"
|
"otherNames"
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package eu.dnetlib.dhp.enrich.orcid
|
package eu.dnetlib.dhp.enrich.orcid
|
||||||
|
|
||||||
import eu.dnetlib.dhp.enrich.orcid.ORCIDAuthorMatchers.matchOrderedTokenAndAbbreviations
|
import eu.dnetlib.pace.util.AuthorMatchers.matchOrderedTokenAndAbbreviations
|
||||||
import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
|
import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
|
|
||||||
|
|
|
@ -8,9 +8,13 @@ fi
|
||||||
|
|
||||||
export HADOOP_USER_NAME=$2
|
export HADOOP_USER_NAME=$2
|
||||||
|
|
||||||
|
|
||||||
|
# Set the active HDFS node of OCEAN and IMPALA cluster.
|
||||||
|
OCEAN_HDFS_NODE='hdfs://nameservice1'
|
||||||
|
echo -e "\nOCEAN HDFS virtual-name which resolves automatically to the active-node: ${OCEAN_HDFS_NODE}"
|
||||||
|
|
||||||
IMPALA_HDFS_NODE=''
|
IMPALA_HDFS_NODE=''
|
||||||
COUNTER=0
|
COUNTER=0
|
||||||
|
|
||||||
while [ $COUNTER -lt 3 ]; do
|
while [ $COUNTER -lt 3 ]; do
|
||||||
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
|
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
|
||||||
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
||||||
|
@ -24,71 +28,195 @@ while [ $COUNTER -lt 3 ]; do
|
||||||
fi
|
fi
|
||||||
((COUNTER++))
|
((COUNTER++))
|
||||||
done
|
done
|
||||||
|
|
||||||
if [ -z "$IMPALA_HDFS_NODE" ]; then
|
if [ -z "$IMPALA_HDFS_NODE" ]; then
|
||||||
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! $COUNTER\n\n"
|
echo -e "\n\nERROR: PROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! | AFTER ${COUNTER} RETRIES.\n\n"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries."
|
echo -e "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries.\n\n"
|
||||||
|
|
||||||
|
IMPALA_HOSTNAME='impala-cluster-dn1.openaire.eu'
|
||||||
|
IMPALA_CONFIG_FILE='/etc/impala_cluster/hdfs-site.xml'
|
||||||
|
|
||||||
|
IMPALA_HDFS_DB_BASE_PATH="${IMPALA_HDFS_NODE}/user/hive/warehouse"
|
||||||
|
|
||||||
|
|
||||||
|
# Set sed arguments.
|
||||||
|
LOCATION_HDFS_NODE_SED_ARG="s|${OCEAN_HDFS_NODE}|${IMPALA_HDFS_NODE}|g" # This requires to be used with "sed -e" in order to have the "|" delimiter (as the "/" conflicts with the URIs)
|
||||||
|
|
||||||
|
# Set the SED command arguments for column-names with reserved words:
|
||||||
|
DATE_SED_ARG_1='s/[[:space:]]\date[[:space:]]/\`date\`/g'
|
||||||
|
DATE_SED_ARG_2='s/\.date,/\.\`date\`,/g' # the "date" may be part of a larger field name like "datestamp" or "date_aggregated", so we need to be careful with what we are replacing.
|
||||||
|
DATE_SED_ARG_3='s/\.date[[:space:]]/\.\`date\` /g'
|
||||||
|
|
||||||
|
HASH_SED_ARG_1='s/[[:space:]]\hash[[:space:]]/\`hash\`/g'
|
||||||
|
HASH_SED_ARG_2='s/\.hash,/\.\`hash\`,/g'
|
||||||
|
HASH_SED_ARG_3='s/\.hash[[:space:]]/\.\`hash\` /g'
|
||||||
|
|
||||||
|
LOCATION_SED_ARG_1='s/[[:space:]]\location[[:space:]]/\`location\`/g'
|
||||||
|
LOCATION_SED_ARG_2='s/\.location,/\.\`location\`,/g'
|
||||||
|
LOCATION_SED_ARG_3='s/\.location[[:space:]]/\.\`location\` /g'
|
||||||
|
|
||||||
|
|
||||||
function copydb() {
|
function copydb() {
|
||||||
|
|
||||||
|
|
||||||
db=$1
|
db=$1
|
||||||
FILE=("hive_wf_tmp_"$RANDOM)
|
echo -e "\nStart processing db: '${db}'..\n"
|
||||||
hdfs dfs -mkdir ${IMPALA_HDFS_NODE}/tmp/$FILE/
|
|
||||||
|
|
||||||
# change ownership to impala
|
# Delete the old DB from Impala cluster (if exists).
|
||||||
# hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -chmod -R 777 /tmp/$FILE/${db}.db
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "drop database if exists ${db} cascade" |& tee error.log # impala-shell prints all logs in stderr, so wee need to capture them and put them in a file, in order to perform "grep" on them later
|
||||||
hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -chmod -R 777 /tmp/$FILE/
|
log_errors=`cat error.log | grep -E "WARN|ERROR|FAILED"`
|
||||||
|
if [ -n "$log_errors" ]; then
|
||||||
|
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN DROPPING THE OLD DATABASE! EXITING...\n\n"
|
||||||
|
rm -f error.log
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Make Impala aware of the deletion of the old DB immediately.
|
||||||
|
sleep 1
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
|
|
||||||
# copy the databases from ocean to impala
|
echo -e "\n\nCopying files of '${db}', from Ocean to Impala cluster..\n"
|
||||||
echo "copying $db"
|
# Using max-bandwidth of: 50 * 100 Mb/s = 5 Gb/s
|
||||||
hadoop distcp -Dmapreduce.map.memory.mb=6144 -pb hdfs://nameservice1/user/hive/warehouse/${db}.db ${IMPALA_HDFS_NODE}/tmp/$FILE/
|
# Using max memory of: 50 * 6144 = 300 Gb
|
||||||
|
# Using 1MB as a buffer-size.
|
||||||
|
# The " -Ddistcp.dynamic.recordsPerChunk=50" arg is not available in our version of hadoop
|
||||||
|
# The "ug" args cannot be used as we get a "User does not belong to hive" error.
|
||||||
|
# The "p" argument cannot be used, as it blocks the files from being used, giving a "sticky bit"-error, even after applying chmod and chown onm the files.
|
||||||
|
hadoop distcp -Dmapreduce.map.memory.mb=6144 -m 70 -bandwidth 150 \
|
||||||
|
-numListstatusThreads 40 \
|
||||||
|
-copybuffersize 1048576 \
|
||||||
|
-strategy dynamic \
|
||||||
|
-pb \
|
||||||
|
${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH}
|
||||||
|
|
||||||
hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -chmod -R 777 /tmp/$FILE/${db}.db
|
# Check the exit status of the "hadoop distcp" command.
|
||||||
|
if [ $? -eq 0 ]; then
|
||||||
|
echo -e "\nSuccessfully copied the files of '${db}'.\n"
|
||||||
|
else
|
||||||
|
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT WITH EXIT STATUS: $?\n\n"
|
||||||
|
rm -f error.log
|
||||||
|
return 2
|
||||||
|
fi
|
||||||
|
|
||||||
# drop tables from db
|
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the following costly operation as well..
|
||||||
for i in `impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} --delimited -q "show tables"`;
|
#hdfs dfs -conf ${IMPALA_CONFIG_FILE} -chmod -R 777 ${TEMP_SUBDIR_FULLPATH}/${db}.db
|
||||||
do
|
|
||||||
`impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "drop table $i;"`;
|
echo -e "\nCreating schema for db: '${db}'\n"
|
||||||
|
|
||||||
|
# create the new database (with the same name)
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create database ${db}"
|
||||||
|
|
||||||
|
# Make Impala aware of the creation of the new DB immediately.
|
||||||
|
sleep 1
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
|
sleep 1
|
||||||
|
# Because "Hive" and "Impala" do not have compatible schemas, we cannot use the "show create table <name>" output from hive to create the exact same table in impala.
|
||||||
|
# So, we have to find at least one parquet file (check if it's there) from the table in the ocean cluster for impala to use it to extract the table-schema itself from that file.
|
||||||
|
|
||||||
|
all_create_view_statements=()
|
||||||
|
|
||||||
|
entities_on_ocean=`hive -e "show tables in ${db};" | sed 's/WARN:.*//g'` # Get the tables and views without any potential the "WARN" logs.
|
||||||
|
for i in ${entities_on_ocean[@]}; do # Use un-quoted values, as the elemetns are single-words.
|
||||||
|
# Check if this is a view by showing the create-statement where it should print "create view" for a view, not the "create table". Unfortunately, there is no "show views" command.
|
||||||
|
create_entity_statement=`hive -e "show create table ${db}.${i};"` # It needs to happen in two stages, otherwise the "grep" is not able to match multi-line statement.
|
||||||
|
|
||||||
|
create_view_statement_test=`echo -e "$create_entity_statement" | grep 'CREATE VIEW'`
|
||||||
|
if [ -n "$create_view_statement_test" ]; then
|
||||||
|
echo -e "\n'${i}' is a view, so we will save its 'create view' statement and execute it on Impala, after all tables have been created.\n"
|
||||||
|
create_view_statement=`echo -e "$create_entity_statement" | sed 's/WARN:.*//g' | sed 's/\`//g' \
|
||||||
|
| sed 's/"$/;/' | sed 's/^"//' | sed 's/\\"\\"/\"/g' | sed -e "${LOCATION_HDFS_NODE_SED_ARG}" | sed "${DATE_SED_ARG_1}" | sed "${HASH_SED_ARG_1}" | sed "${LOCATION_SED_ARG_1}" \
|
||||||
|
| sed "${DATE_SED_ARG_2}" | sed "${HASH_SED_ARG_2}" | sed "${LOCATION_SED_ARG_2}" \
|
||||||
|
| sed "${DATE_SED_ARG_3}" | sed "${HASH_SED_ARG_3}" | sed "${LOCATION_SED_ARG_3}"`
|
||||||
|
all_create_view_statements+=("$create_view_statement")
|
||||||
|
else
|
||||||
|
echo -e "\n'${i}' is a table, so we will check for its parquet files and create the table on Impala cluster.\n"
|
||||||
|
CURRENT_PRQ_FILE=`hdfs dfs -conf ${IMPALA_CONFIG_FILE} -ls -C "${IMPALA_HDFS_DB_BASE_PATH}/${db}.db/${i}/" | grep -v 'Found' | grep -v '_impala_insert_staging' | head -1`
|
||||||
|
if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside.
|
||||||
|
echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n"
|
||||||
|
else
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log
|
||||||
|
log_errors=`cat error.log | grep -E "WARN|ERROR|FAILED"`
|
||||||
|
if [ -n "$log_errors" ]; then
|
||||||
|
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN CREATING TABLE '${i}'!\n\n"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# drop views from db
|
echo -e "\nAll tables have been created, going to create the views..\n"
|
||||||
for i in `impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} --delimited -q "show tables"`;
|
|
||||||
do
|
# Time to loop through the views and create them.
|
||||||
`impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "drop view $i;"`;
|
# At this point all table-schemas should have been created.
|
||||||
|
|
||||||
|
previous_num_of_views_to_retry=${#all_create_view_statements}
|
||||||
|
if [[ $previous_num_of_views_to_retry -gt 0 ]]; then
|
||||||
|
echo -e "\nAll_create_view_statements:\n\n${all_create_view_statements[@]}\n" # DEBUG
|
||||||
|
# Make Impala aware of the new tables, so it knows them when creating the views.
|
||||||
|
sleep 1
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
|
sleep 1
|
||||||
|
else
|
||||||
|
echo -e "\nDB '${db}' does not contain any views.\n"
|
||||||
|
fi
|
||||||
|
|
||||||
|
level_counter=0
|
||||||
|
while [[ ${#all_create_view_statements[@]} -gt 0 ]]; do
|
||||||
|
((level_counter++))
|
||||||
|
# The only accepted reason for a view to not be created, is if it depends on another view, which has not been created yet.
|
||||||
|
# In this case, we should retry creating this particular view again.
|
||||||
|
should_retry_create_view_statements=()
|
||||||
|
|
||||||
|
for create_view_statement in "${all_create_view_statements[@]}"; do # Here we use double quotes, as the elements are phrases, instead of single-words.
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "${create_view_statement}" |& tee error.log # impala-shell prints all logs in stderr, so wee need to capture them and put them in a file, in order to perform "grep" on them later
|
||||||
|
specific_errors=`cat error.log | grep -E "FAILED: ParseException line 1:13 missing TABLE at 'view'|ERROR: AnalysisException: Could not resolve table reference:"`
|
||||||
|
if [ -n "$specific_errors" ]; then
|
||||||
|
echo -e "\nspecific_errors: ${specific_errors}\n"
|
||||||
|
echo -e "\nView '$(cat error.log | grep "CREATE VIEW " | sed 's/CREATE VIEW //g' | sed 's/ as select .*//g')' failed to be created, possibly because it depends on another view.\n"
|
||||||
|
should_retry_create_view_statements+=("$create_view_statement")
|
||||||
|
else
|
||||||
|
sleep 1 # Wait a bit for Impala to register that the view was created, before possibly referencing it by another view.
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# delete the database
|
new_num_of_views_to_retry=${#should_retry_create_view_statements}
|
||||||
impala-shell -i impala-cluster-dn1.openaire.eu -q "drop database if exists ${db} cascade";
|
if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then
|
||||||
|
echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n"
|
||||||
# create the databases
|
return 3
|
||||||
impala-shell -i impala-cluster-dn1.openaire.eu -q "create database ${db}";
|
elif [[ $new_num_of_views_to_retry -gt 0 ]]; then
|
||||||
|
echo -e "\nTo be retried \"create_view_statements\":\n\n${should_retry_create_view_statements[@]}\n"
|
||||||
impala-shell -q "INVALIDATE METADATA"
|
previous_num_of_views_to_retry=$new_num_of_views_to_retry
|
||||||
echo "creating schema for ${db}"
|
else
|
||||||
for (( k = 0; k < 5; k ++ )); do
|
echo -e "\nFinished creating views, for db: '${db}', in level-${level_counter}.\n"
|
||||||
for i in `impala-shell -d ${db} --delimited -q "show tables"`;
|
fi
|
||||||
do
|
all_create_view_statements=("${should_retry_create_view_statement[@]}") # This is needed in any case to either move forward with the rest of the views or stop at 0 remaining views.
|
||||||
impala-shell -d ${db} --delimited -q "show create table $i";
|
|
||||||
done | sed 's/"$/;/' | sed 's/^"//' | sed 's/[[:space:]]\date[[:space:]]/`date`/g' | impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -c -f -
|
|
||||||
done
|
done
|
||||||
|
|
||||||
# load the data from /tmp in the respective tables
|
sleep 1
|
||||||
echo "copying data in tables and computing stats"
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
for i in `impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} --delimited -q "show tables"`;
|
sleep 1
|
||||||
do
|
|
||||||
impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "load data inpath '/tmp/$FILE/${db}.db/$i' into table $i";
|
echo -e "\nComputing stats for tables..\n"
|
||||||
impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "compute stats $i";
|
entities_on_impala=`impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} --delimited -q "show tables in ${db}"`
|
||||||
|
for i in ${entities_on_impala[@]}; do # Use un-quoted values, as the elemetns are single-words.
|
||||||
|
# Taking the create table statement from the Ocean cluster, just to check if its a view, as the output is easier than using impala-shell from Impala cluster.
|
||||||
|
create_view_statement=`hive -e "show create table ${db}.${i};" | grep "CREATE VIEW"` # This grep works here, as we do not want to match multiple-lines.
|
||||||
|
if [ -z "$create_view_statement" ]; then # If it's a table, then go load the data to it.
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "compute stats ${db}.${i}";
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# deleting the remaining directory from hdfs
|
if [ "${entities_on_impala[@]}" == "${entities_on_ocean[@]}" ]; then
|
||||||
hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -rm -R /tmp/$FILE/${db}.db
|
echo -e "\nAll entities have been copied to Impala cluster.\n"
|
||||||
|
else
|
||||||
|
echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n"
|
||||||
|
rm -f error.log
|
||||||
|
return 4
|
||||||
|
fi
|
||||||
|
|
||||||
|
rm -f error.log
|
||||||
|
echo -e "\n\nFinished processing db: ${db}\n\n"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MONITOR_DB=$1
|
MONITOR_DB=$1
|
||||||
#HADOOP_USER_NAME=$2
|
#HADOOP_USER_NAME=$2
|
||||||
copydb $MONITOR_DB
|
copydb $MONITOR_DB
|
||||||
|
|
|
@ -8,9 +8,12 @@ fi
|
||||||
|
|
||||||
export HADOOP_USER_NAME=$2
|
export HADOOP_USER_NAME=$2
|
||||||
|
|
||||||
|
# Set the active HDFS node of OCEAN and IMPALA cluster.
|
||||||
|
OCEAN_HDFS_NODE='hdfs://nameservice1'
|
||||||
|
echo -e "\nOCEAN HDFS virtual-name which resolves automatically to the active-node: ${OCEAN_HDFS_NODE}"
|
||||||
|
|
||||||
IMPALA_HDFS_NODE=''
|
IMPALA_HDFS_NODE=''
|
||||||
COUNTER=0
|
COUNTER=0
|
||||||
|
|
||||||
while [ $COUNTER -lt 3 ]; do
|
while [ $COUNTER -lt 3 ]; do
|
||||||
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
|
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
|
||||||
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
||||||
|
@ -24,70 +27,195 @@ while [ $COUNTER -lt 3 ]; do
|
||||||
fi
|
fi
|
||||||
((COUNTER++))
|
((COUNTER++))
|
||||||
done
|
done
|
||||||
|
|
||||||
if [ -z "$IMPALA_HDFS_NODE" ]; then
|
if [ -z "$IMPALA_HDFS_NODE" ]; then
|
||||||
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! $COUNTER\n\n"
|
echo -e "\n\nERROR: PROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! | AFTER ${COUNTER} RETRIES.\n\n"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries."
|
echo -e "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries.\n\n"
|
||||||
|
|
||||||
|
IMPALA_HOSTNAME='impala-cluster-dn1.openaire.eu'
|
||||||
|
IMPALA_CONFIG_FILE='/etc/impala_cluster/hdfs-site.xml'
|
||||||
|
|
||||||
|
IMPALA_HDFS_DB_BASE_PATH="${IMPALA_HDFS_NODE}/user/hive/warehouse"
|
||||||
|
|
||||||
|
|
||||||
|
# Set sed arguments.
|
||||||
|
LOCATION_HDFS_NODE_SED_ARG="s|${OCEAN_HDFS_NODE}|${IMPALA_HDFS_NODE}|g" # This requires to be used with "sed -e" in order to have the "|" delimiter (as the "/" conflicts with the URIs)
|
||||||
|
|
||||||
|
# Set the SED command arguments for column-names with reserved words:
|
||||||
|
DATE_SED_ARG_1='s/[[:space:]]\date[[:space:]]/\`date\`/g'
|
||||||
|
DATE_SED_ARG_2='s/\.date,/\.\`date\`,/g' # the "date" may be part of a larger field name like "datestamp" or "date_aggregated", so we need to be careful with what we are replacing.
|
||||||
|
DATE_SED_ARG_3='s/\.date[[:space:]]/\.\`date\` /g'
|
||||||
|
|
||||||
|
HASH_SED_ARG_1='s/[[:space:]]\hash[[:space:]]/\`hash\`/g'
|
||||||
|
HASH_SED_ARG_2='s/\.hash,/\.\`hash\`,/g'
|
||||||
|
HASH_SED_ARG_3='s/\.hash[[:space:]]/\.\`hash\` /g'
|
||||||
|
|
||||||
|
LOCATION_SED_ARG_1='s/[[:space:]]\location[[:space:]]/\`location\`/g'
|
||||||
|
LOCATION_SED_ARG_2='s/\.location,/\.\`location\`,/g'
|
||||||
|
LOCATION_SED_ARG_3='s/\.location[[:space:]]/\.\`location\` /g'
|
||||||
|
|
||||||
|
|
||||||
function copydb() {
|
function copydb() {
|
||||||
|
|
||||||
db=$1
|
db=$1
|
||||||
FILE=("hive_wf_tmp_"$RANDOM)
|
echo -e "\nStart processing db: '${db}'..\n"
|
||||||
hdfs dfs -mkdir ${IMPALA_HDFS_NODE}/tmp/$FILE/
|
|
||||||
|
|
||||||
# change ownership to impala
|
# Delete the old DB from Impala cluster (if exists).
|
||||||
# hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -chmod -R 777 /tmp/$FILE/${db}.db
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "drop database if exists ${db} cascade" |& tee error.log # impala-shell prints all logs in stderr, so wee need to capture them and put them in a file, in order to perform "grep" on them later
|
||||||
hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -chmod -R 777 /tmp/$FILE/
|
log_errors=`cat error.log | grep -E "WARN|ERROR|FAILED"`
|
||||||
|
if [ -n "$log_errors" ]; then
|
||||||
|
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN DROPPING THE OLD DATABASE! EXITING...\n\n"
|
||||||
|
rm -f error.log
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Make Impala aware of the deletion of the old DB immediately.
|
||||||
|
sleep 1
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
|
|
||||||
# copy the databases from ocean to impala
|
echo -e "\n\nCopying files of '${db}', from Ocean to Impala cluster..\n"
|
||||||
echo "copying $db"
|
# Using max-bandwidth of: 50 * 100 Mb/s = 5 Gb/s
|
||||||
hadoop distcp -Dmapreduce.map.memory.mb=6144 -pb hdfs://nameservice1/user/hive/warehouse/${db}.db ${IMPALA_HDFS_NODE}/tmp/$FILE/
|
# Using max memory of: 50 * 6144 = 300 Gb
|
||||||
|
# Using 1MB as a buffer-size.
|
||||||
|
# The " -Ddistcp.dynamic.recordsPerChunk=50" arg is not available in our version of hadoop
|
||||||
|
# The "ug" args cannot be used as we get a "User does not belong to hive" error.
|
||||||
|
# The "p" argument cannot be used, as it blocks the files from being used, giving a "sticky bit"-error, even after applying chmod and chown onm the files.
|
||||||
|
hadoop distcp -Dmapreduce.map.memory.mb=6144 -m 70 -bandwidth 150 \
|
||||||
|
-numListstatusThreads 40 \
|
||||||
|
-copybuffersize 1048576 \
|
||||||
|
-strategy dynamic \
|
||||||
|
-pb \
|
||||||
|
${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH}
|
||||||
|
|
||||||
hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -chmod -R 777 /tmp/$FILE/${db}.db
|
# Check the exit status of the "hadoop distcp" command.
|
||||||
|
if [ $? -eq 0 ]; then
|
||||||
|
echo -e "\nSuccessfully copied the files of '${db}'.\n"
|
||||||
|
else
|
||||||
|
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT WITH EXIT STATUS: $?\n\n"
|
||||||
|
rm -f error.log
|
||||||
|
return 2
|
||||||
|
fi
|
||||||
|
|
||||||
# drop tables from db
|
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the following costly operation as well..
|
||||||
for i in `impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} --delimited -q "show tables"`;
|
#hdfs dfs -conf ${IMPALA_CONFIG_FILE} -chmod -R 777 ${TEMP_SUBDIR_FULLPATH}/${db}.db
|
||||||
do
|
|
||||||
`impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "drop table $i;"`;
|
echo -e "\nCreating schema for db: '${db}'\n"
|
||||||
|
|
||||||
|
# create the new database (with the same name)
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create database ${db}"
|
||||||
|
|
||||||
|
# Make Impala aware of the creation of the new DB immediately.
|
||||||
|
sleep 1
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
|
sleep 1
|
||||||
|
# Because "Hive" and "Impala" do not have compatible schemas, we cannot use the "show create table <name>" output from hive to create the exact same table in impala.
|
||||||
|
# So, we have to find at least one parquet file (check if it's there) from the table in the ocean cluster for impala to use it to extract the table-schema itself from that file.
|
||||||
|
|
||||||
|
all_create_view_statements=()
|
||||||
|
|
||||||
|
entities_on_ocean=`hive -e "show tables in ${db};" | sed 's/WARN:.*//g'` # Get the tables and views without any potential the "WARN" logs.
|
||||||
|
for i in ${entities_on_ocean[@]}; do # Use un-quoted values, as the elemetns are single-words.
|
||||||
|
# Check if this is a view by showing the create-statement where it should print "create view" for a view, not the "create table". Unfortunately, there is no "show views" command.
|
||||||
|
create_entity_statement=`hive -e "show create table ${db}.${i};"` # It needs to happen in two stages, otherwise the "grep" is not able to match multi-line statement.
|
||||||
|
|
||||||
|
create_view_statement_test=`echo -e "$create_entity_statement" | grep 'CREATE VIEW'`
|
||||||
|
if [ -n "$create_view_statement_test" ]; then
|
||||||
|
echo -e "\n'${i}' is a view, so we will save its 'create view' statement and execute it on Impala, after all tables have been created.\n"
|
||||||
|
create_view_statement=`echo -e "$create_entity_statement" | sed 's/WARN:.*//g' | sed 's/\`//g' \
|
||||||
|
| sed 's/"$/;/' | sed 's/^"//' | sed 's/\\"\\"/\"/g' | sed -e "${LOCATION_HDFS_NODE_SED_ARG}" | sed "${DATE_SED_ARG_1}" | sed "${HASH_SED_ARG_1}" | sed "${LOCATION_SED_ARG_1}" \
|
||||||
|
| sed "${DATE_SED_ARG_2}" | sed "${HASH_SED_ARG_2}" | sed "${LOCATION_SED_ARG_2}" \
|
||||||
|
| sed "${DATE_SED_ARG_3}" | sed "${HASH_SED_ARG_3}" | sed "${LOCATION_SED_ARG_3}"`
|
||||||
|
all_create_view_statements+=("$create_view_statement")
|
||||||
|
else
|
||||||
|
echo -e "\n'${i}' is a table, so we will check for its parquet files and create the table on Impala cluster.\n"
|
||||||
|
CURRENT_PRQ_FILE=`hdfs dfs -conf ${IMPALA_CONFIG_FILE} -ls -C "${IMPALA_HDFS_DB_BASE_PATH}/${db}.db/${i}/" | grep -v 'Found' | grep -v '_impala_insert_staging' | head -1`
|
||||||
|
if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside.
|
||||||
|
echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n"
|
||||||
|
else
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log
|
||||||
|
log_errors=`cat error.log | grep -E "WARN|ERROR|FAILED"`
|
||||||
|
if [ -n "$log_errors" ]; then
|
||||||
|
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN CREATING TABLE '${i}'!\n\n"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# drop views from db
|
echo -e "\nAll tables have been created, going to create the views..\n"
|
||||||
for i in `impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} --delimited -q "show tables"`;
|
|
||||||
do
|
# Time to loop through the views and create them.
|
||||||
`impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "drop view $i;"`;
|
# At this point all table-schemas should have been created.
|
||||||
|
|
||||||
|
previous_num_of_views_to_retry=${#all_create_view_statements}
|
||||||
|
if [[ $previous_num_of_views_to_retry -gt 0 ]]; then
|
||||||
|
echo -e "\nAll_create_view_statements:\n\n${all_create_view_statements[@]}\n" # DEBUG
|
||||||
|
# Make Impala aware of the new tables, so it knows them when creating the views.
|
||||||
|
sleep 1
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
|
sleep 1
|
||||||
|
else
|
||||||
|
echo -e "\nDB '${db}' does not contain any views.\n"
|
||||||
|
fi
|
||||||
|
|
||||||
|
level_counter=0
|
||||||
|
while [[ ${#all_create_view_statements[@]} -gt 0 ]]; do
|
||||||
|
((level_counter++))
|
||||||
|
# The only accepted reason for a view to not be created, is if it depends on another view, which has not been created yet.
|
||||||
|
# In this case, we should retry creating this particular view again.
|
||||||
|
should_retry_create_view_statements=()
|
||||||
|
|
||||||
|
for create_view_statement in "${all_create_view_statements[@]}"; do # Here we use double quotes, as the elements are phrases, instead of single-words.
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "${create_view_statement}" |& tee error.log # impala-shell prints all logs in stderr, so wee need to capture them and put them in a file, in order to perform "grep" on them later
|
||||||
|
specific_errors=`cat error.log | grep -E "FAILED: ParseException line 1:13 missing TABLE at 'view'|ERROR: AnalysisException: Could not resolve table reference:"`
|
||||||
|
if [ -n "$specific_errors" ]; then
|
||||||
|
echo -e "\nspecific_errors: ${specific_errors}\n"
|
||||||
|
echo -e "\nView '$(cat error.log | grep "CREATE VIEW " | sed 's/CREATE VIEW //g' | sed 's/ as select .*//g')' failed to be created, possibly because it depends on another view.\n"
|
||||||
|
should_retry_create_view_statements+=("$create_view_statement")
|
||||||
|
else
|
||||||
|
sleep 1 # Wait a bit for Impala to register that the view was created, before possibly referencing it by another view.
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# delete the database
|
new_num_of_views_to_retry=${#should_retry_create_view_statements}
|
||||||
impala-shell -i impala-cluster-dn1.openaire.eu -q "drop database if exists ${db} cascade";
|
if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then
|
||||||
|
echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n"
|
||||||
# create the databases
|
return 3
|
||||||
impala-shell -i impala-cluster-dn1.openaire.eu -q "create database ${db}";
|
elif [[ $new_num_of_views_to_retry -gt 0 ]]; then
|
||||||
|
echo -e "\nTo be retried \"create_view_statements\":\n\n${should_retry_create_view_statements[@]}\n"
|
||||||
impala-shell -q "INVALIDATE METADATA"
|
previous_num_of_views_to_retry=$new_num_of_views_to_retry
|
||||||
echo "creating schema for ${db}"
|
else
|
||||||
for (( k = 0; k < 5; k ++ )); do
|
echo -e "\nFinished creating views, for db: '${db}', in level-${level_counter}.\n"
|
||||||
for i in `impala-shell -d ${db} --delimited -q "show tables"`;
|
fi
|
||||||
do
|
all_create_view_statements=("${should_retry_create_view_statement[@]}") # This is needed in any case to either move forward with the rest of the views or stop at 0 remaining views.
|
||||||
impala-shell -d ${db} --delimited -q "show create table $i";
|
|
||||||
done | sed 's/"$/;/' | sed 's/^"//' | sed 's/[[:space:]]\date[[:space:]]/`date`/g' | impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -c -f -
|
|
||||||
done
|
done
|
||||||
|
|
||||||
# load the data from /tmp in the respective tables
|
sleep 1
|
||||||
echo "copying data in tables and computing stats"
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
for i in `impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} --delimited -q "show tables"`;
|
sleep 1
|
||||||
do
|
|
||||||
impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "load data inpath '/tmp/$FILE/${db}.db/$i' into table $i";
|
echo -e "\nComputing stats for tables..\n"
|
||||||
impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "compute stats $i";
|
entities_on_impala=`impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} --delimited -q "show tables in ${db}"`
|
||||||
|
for i in ${entities_on_impala[@]}; do # Use un-quoted values, as the elemetns are single-words.
|
||||||
|
# Taking the create table statement from the Ocean cluster, just to check if its a view, as the output is easier than using impala-shell from Impala cluster.
|
||||||
|
create_view_statement=`hive -e "show create table ${db}.${i};" | grep "CREATE VIEW"` # This grep works here, as we do not want to match multiple-lines.
|
||||||
|
if [ -z "$create_view_statement" ]; then # If it's a table, then go load the data to it.
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "compute stats ${db}.${i}";
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# deleting the remaining directory from hdfs
|
if [ "${entities_on_impala[@]}" == "${entities_on_ocean[@]}" ]; then
|
||||||
hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -rm -R /tmp/$FILE/${db}.db
|
echo -e "\nAll entities have been copied to Impala cluster.\n"
|
||||||
|
else
|
||||||
|
echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n"
|
||||||
|
rm -f error.log
|
||||||
|
return 4
|
||||||
|
fi
|
||||||
|
|
||||||
|
rm -f error.log
|
||||||
|
echo -e "\n\nFinished processing db: ${db}\n\n"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MONITOR_DB=$1
|
MONITOR_DB=$1
|
||||||
#HADOOP_USER_NAME=$2
|
#HADOOP_USER_NAME=$2
|
||||||
copydb $MONITOR_DB
|
copydb $MONITOR_DB
|
||||||
|
|
|
@ -6,11 +6,14 @@ then
|
||||||
ln -sfn ${PYTHON_EGG_CACHE}${link_folder} ${link_folder}
|
ln -sfn ${PYTHON_EGG_CACHE}${link_folder} ${link_folder}
|
||||||
fi
|
fi
|
||||||
|
|
||||||
#export HADOOP_USER_NAME=$2
|
export HADOOP_USER_NAME=$2
|
||||||
|
|
||||||
|
# Set the active HDFS node of OCEAN and IMPALA cluster.
|
||||||
|
OCEAN_HDFS_NODE='hdfs://nameservice1'
|
||||||
|
echo -e "\nOCEAN HDFS virtual-name which resolves automatically to the active-node: ${OCEAN_HDFS_NODE}"
|
||||||
|
|
||||||
IMPALA_HDFS_NODE=''
|
IMPALA_HDFS_NODE=''
|
||||||
COUNTER=0
|
COUNTER=0
|
||||||
|
|
||||||
while [ $COUNTER -lt 3 ]; do
|
while [ $COUNTER -lt 3 ]; do
|
||||||
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
|
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
|
||||||
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
||||||
|
@ -24,75 +27,196 @@ while [ $COUNTER -lt 3 ]; do
|
||||||
fi
|
fi
|
||||||
((COUNTER++))
|
((COUNTER++))
|
||||||
done
|
done
|
||||||
|
|
||||||
if [ -z "$IMPALA_HDFS_NODE" ]; then
|
if [ -z "$IMPALA_HDFS_NODE" ]; then
|
||||||
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! $COUNTER\n\n"
|
echo -e "\n\nERROR: PROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! | AFTER ${COUNTER} RETRIES.\n\n"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries."
|
echo -e "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries.\n\n"
|
||||||
|
|
||||||
|
IMPALA_HOSTNAME='impala-cluster-dn1.openaire.eu'
|
||||||
|
IMPALA_CONFIG_FILE='/etc/impala_cluster/hdfs-site.xml'
|
||||||
|
|
||||||
|
IMPALA_HDFS_DB_BASE_PATH="${IMPALA_HDFS_NODE}/user/hive/warehouse"
|
||||||
|
|
||||||
|
|
||||||
|
# Set sed arguments.
|
||||||
|
LOCATION_HDFS_NODE_SED_ARG="s|${OCEAN_HDFS_NODE}|${IMPALA_HDFS_NODE}|g" # This requires to be used with "sed -e" in order to have the "|" delimiter (as the "/" conflicts with the URIs)
|
||||||
|
|
||||||
|
# Set the SED command arguments for column-names with reserved words:
|
||||||
|
DATE_SED_ARG_1='s/[[:space:]]\date[[:space:]]/\`date\`/g'
|
||||||
|
DATE_SED_ARG_2='s/\.date,/\.\`date\`,/g' # the "date" may be part of a larger field name like "datestamp" or "date_aggregated", so we need to be careful with what we are replacing.
|
||||||
|
DATE_SED_ARG_3='s/\.date[[:space:]]/\.\`date\` /g'
|
||||||
|
|
||||||
|
HASH_SED_ARG_1='s/[[:space:]]\hash[[:space:]]/\`hash\`/g'
|
||||||
|
HASH_SED_ARG_2='s/\.hash,/\.\`hash\`,/g'
|
||||||
|
HASH_SED_ARG_3='s/\.hash[[:space:]]/\.\`hash\` /g'
|
||||||
|
|
||||||
|
LOCATION_SED_ARG_1='s/[[:space:]]\location[[:space:]]/\`location\`/g'
|
||||||
|
LOCATION_SED_ARG_2='s/\.location,/\.\`location\`,/g'
|
||||||
|
LOCATION_SED_ARG_3='s/\.location[[:space:]]/\.\`location\` /g'
|
||||||
|
|
||||||
|
|
||||||
function copydb() {
|
function copydb() {
|
||||||
|
|
||||||
export HADOOP_USER="dimitris.pierrakos"
|
|
||||||
export HADOOP_USER_NAME='dimitris.pierrakos'
|
|
||||||
|
|
||||||
db=$1
|
db=$1
|
||||||
FILE=("hive_wf_tmp_"$RANDOM)
|
echo -e "\nStart processing db: '${db}'..\n"
|
||||||
hdfs dfs -mkdir ${IMPALA_HDFS_NODE}/tmp/$FILE/φ
|
|
||||||
|
|
||||||
# change ownership to impala
|
# Delete the old DB from Impala cluster (if exists).
|
||||||
# hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -chmod -R 777 /tmp/$FILE/${db}.db
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "drop database if exists ${db} cascade" |& tee error.log # impala-shell prints all logs in stderr, so wee need to capture them and put them in a file, in order to perform "grep" on them later
|
||||||
hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -chmod -R 777 /tmp/$FILE/
|
log_errors=`cat error.log | grep -E "WARN|ERROR|FAILED"`
|
||||||
|
if [ -n "$log_errors" ]; then
|
||||||
|
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN DROPPING THE OLD DATABASE! EXITING...\n\n"
|
||||||
|
rm -f error.log
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Make Impala aware of the deletion of the old DB immediately.
|
||||||
|
sleep 1
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
|
|
||||||
# copy the databases from ocean to impala
|
echo -e "\n\nCopying files of '${db}', from Ocean to Impala cluster..\n"
|
||||||
echo "copying $db"
|
# Using max-bandwidth of: 50 * 100 Mb/s = 5 Gb/s
|
||||||
hadoop distcp -Dmapreduce.map.memory.mb=6144 -pb hdfs://nameservice1/user/hive/warehouse/${db}.db ${IMPALA_HDFS_NODE}/tmp/$FILE/
|
# Using max memory of: 50 * 6144 = 300 Gb
|
||||||
|
# Using 1MB as a buffer-size.
|
||||||
|
# The " -Ddistcp.dynamic.recordsPerChunk=50" arg is not available in our version of hadoop
|
||||||
|
# The "ug" args cannot be used as we get a "User does not belong to hive" error.
|
||||||
|
# The "p" argument cannot be used, as it blocks the files from being used, giving a "sticky bit"-error, even after applying chmod and chown onm the files.
|
||||||
|
hadoop distcp -Dmapreduce.map.memory.mb=6144 -m 70 -bandwidth 150 \
|
||||||
|
-numListstatusThreads 40 \
|
||||||
|
-copybuffersize 1048576 \
|
||||||
|
-strategy dynamic \
|
||||||
|
-pb \
|
||||||
|
${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH}
|
||||||
|
|
||||||
hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -chmod -R 777 /tmp/$FILE/${db}.db
|
# Check the exit status of the "hadoop distcp" command.
|
||||||
|
if [ $? -eq 0 ]; then
|
||||||
|
echo -e "\nSuccessfully copied the files of '${db}'.\n"
|
||||||
|
else
|
||||||
|
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT WITH EXIT STATUS: $?\n\n"
|
||||||
|
rm -f error.log
|
||||||
|
return 2
|
||||||
|
fi
|
||||||
|
|
||||||
# drop tables from db
|
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the following costly operation as well..
|
||||||
for i in `impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} --delimited -q "show tables"`;
|
#hdfs dfs -conf ${IMPALA_CONFIG_FILE} -chmod -R 777 ${TEMP_SUBDIR_FULLPATH}/${db}.db
|
||||||
do
|
|
||||||
`impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "drop table $i;"`;
|
echo -e "\nCreating schema for db: '${db}'\n"
|
||||||
|
|
||||||
|
# create the new database (with the same name)
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create database ${db}"
|
||||||
|
|
||||||
|
# Make Impala aware of the creation of the new DB immediately.
|
||||||
|
sleep 1
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
|
sleep 1
|
||||||
|
# Because "Hive" and "Impala" do not have compatible schemas, we cannot use the "show create table <name>" output from hive to create the exact same table in impala.
|
||||||
|
# So, we have to find at least one parquet file (check if it's there) from the table in the ocean cluster for impala to use it to extract the table-schema itself from that file.
|
||||||
|
|
||||||
|
all_create_view_statements=()
|
||||||
|
|
||||||
|
entities_on_ocean=`hive -e "show tables in ${db};" | sed 's/WARN:.*//g'` # Get the tables and views without any potential the "WARN" logs.
|
||||||
|
for i in ${entities_on_ocean[@]}; do # Use un-quoted values, as the elemetns are single-words.
|
||||||
|
# Check if this is a view by showing the create-statement where it should print "create view" for a view, not the "create table". Unfortunately, there is no "show views" command.
|
||||||
|
create_entity_statement=`hive -e "show create table ${db}.${i};"` # It needs to happen in two stages, otherwise the "grep" is not able to match multi-line statement.
|
||||||
|
|
||||||
|
create_view_statement_test=`echo -e "$create_entity_statement" | grep 'CREATE VIEW'`
|
||||||
|
if [ -n "$create_view_statement_test" ]; then
|
||||||
|
echo -e "\n'${i}' is a view, so we will save its 'create view' statement and execute it on Impala, after all tables have been created.\n"
|
||||||
|
create_view_statement=`echo -e "$create_entity_statement" | sed 's/WARN:.*//g' | sed 's/\`//g' \
|
||||||
|
| sed 's/"$/;/' | sed 's/^"//' | sed 's/\\"\\"/\"/g' | sed -e "${LOCATION_HDFS_NODE_SED_ARG}" | sed "${DATE_SED_ARG_1}" | sed "${HASH_SED_ARG_1}" | sed "${LOCATION_SED_ARG_1}" \
|
||||||
|
| sed "${DATE_SED_ARG_2}" | sed "${HASH_SED_ARG_2}" | sed "${LOCATION_SED_ARG_2}" \
|
||||||
|
| sed "${DATE_SED_ARG_3}" | sed "${HASH_SED_ARG_3}" | sed "${LOCATION_SED_ARG_3}"`
|
||||||
|
all_create_view_statements+=("$create_view_statement")
|
||||||
|
else
|
||||||
|
echo -e "\n'${i}' is a table, so we will check for its parquet files and create the table on Impala cluster.\n"
|
||||||
|
CURRENT_PRQ_FILE=`hdfs dfs -conf ${IMPALA_CONFIG_FILE} -ls -C "${IMPALA_HDFS_DB_BASE_PATH}/${db}.db/${i}/" | grep -v 'Found' | grep -v '_impala_insert_staging' | head -1`
|
||||||
|
if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside.
|
||||||
|
echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n"
|
||||||
|
else
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log
|
||||||
|
log_errors=`cat error.log | grep -E "WARN|ERROR|FAILED"`
|
||||||
|
if [ -n "$log_errors" ]; then
|
||||||
|
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN CREATING TABLE '${i}'!\n\n"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# drop views from db
|
echo -e "\nAll tables have been created, going to create the views..\n"
|
||||||
for i in `impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} --delimited -q "show tables"`;
|
|
||||||
do
|
# Time to loop through the views and create them.
|
||||||
`impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "drop view $i;"`;
|
# At this point all table-schemas should have been created.
|
||||||
|
|
||||||
|
previous_num_of_views_to_retry=${#all_create_view_statements}
|
||||||
|
if [[ $previous_num_of_views_to_retry -gt 0 ]]; then
|
||||||
|
echo -e "\nAll_create_view_statements:\n\n${all_create_view_statements[@]}\n" # DEBUG
|
||||||
|
# Make Impala aware of the new tables, so it knows them when creating the views.
|
||||||
|
sleep 1
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
|
sleep 1
|
||||||
|
else
|
||||||
|
echo -e "\nDB '${db}' does not contain any views.\n"
|
||||||
|
fi
|
||||||
|
|
||||||
|
level_counter=0
|
||||||
|
while [[ ${#all_create_view_statements[@]} -gt 0 ]]; do
|
||||||
|
((level_counter++))
|
||||||
|
# The only accepted reason for a view to not be created, is if it depends on another view, which has not been created yet.
|
||||||
|
# In this case, we should retry creating this particular view again.
|
||||||
|
should_retry_create_view_statements=()
|
||||||
|
|
||||||
|
for create_view_statement in "${all_create_view_statements[@]}"; do # Here we use double quotes, as the elements are phrases, instead of single-words.
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "${create_view_statement}" |& tee error.log # impala-shell prints all logs in stderr, so wee need to capture them and put them in a file, in order to perform "grep" on them later
|
||||||
|
specific_errors=`cat error.log | grep -E "FAILED: ParseException line 1:13 missing TABLE at 'view'|ERROR: AnalysisException: Could not resolve table reference:"`
|
||||||
|
if [ -n "$specific_errors" ]; then
|
||||||
|
echo -e "\nspecific_errors: ${specific_errors}\n"
|
||||||
|
echo -e "\nView '$(cat error.log | grep "CREATE VIEW " | sed 's/CREATE VIEW //g' | sed 's/ as select .*//g')' failed to be created, possibly because it depends on another view.\n"
|
||||||
|
should_retry_create_view_statements+=("$create_view_statement")
|
||||||
|
else
|
||||||
|
sleep 1 # Wait a bit for Impala to register that the view was created, before possibly referencing it by another view.
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# delete the database
|
new_num_of_views_to_retry=${#should_retry_create_view_statements}
|
||||||
impala-shell -i impala-cluster-dn1.openaire.eu -q "drop database if exists ${db} cascade";
|
if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then
|
||||||
|
echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n"
|
||||||
# create the databases
|
return 3
|
||||||
impala-shell -i impala-cluster-dn1.openaire.eu -q "create database ${db}";
|
elif [[ $new_num_of_views_to_retry -gt 0 ]]; then
|
||||||
|
echo -e "\nTo be retried \"create_view_statements\":\n\n${should_retry_create_view_statements[@]}\n"
|
||||||
impala-shell -q "INVALIDATE METADATA"
|
previous_num_of_views_to_retry=$new_num_of_views_to_retry
|
||||||
echo "creating schema for ${db}"
|
else
|
||||||
for (( k = 0; k < 5; k ++ )); do
|
echo -e "\nFinished creating views, for db: '${db}', in level-${level_counter}.\n"
|
||||||
for i in `impala-shell -d ${db} --delimited -q "show tables"`;
|
fi
|
||||||
do
|
all_create_view_statements=("${should_retry_create_view_statement[@]}") # This is needed in any case to either move forward with the rest of the views or stop at 0 remaining views.
|
||||||
impala-shell -d ${db} --delimited -q "show create table $i";
|
|
||||||
done | sed 's/"$/;/' | sed 's/^"//' | sed 's/[[:space:]]\date[[:space:]]/`date`/g' | impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -c -f -
|
|
||||||
done
|
done
|
||||||
|
|
||||||
# load the data from /tmp in the respective tables
|
sleep 1
|
||||||
echo "copying data in tables and computing stats"
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
for i in `impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} --delimited -q "show tables"`;
|
sleep 1
|
||||||
do
|
|
||||||
impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "load data inpath '/tmp/$FILE/${db}.db/$i' into table $i";
|
echo -e "\nComputing stats for tables..\n"
|
||||||
impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "compute stats $i";
|
entities_on_impala=`impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} --delimited -q "show tables in ${db}"`
|
||||||
|
for i in ${entities_on_impala[@]}; do # Use un-quoted values, as the elemetns are single-words.
|
||||||
|
# Taking the create table statement from the Ocean cluster, just to check if its a view, as the output is easier than using impala-shell from Impala cluster.
|
||||||
|
create_view_statement=`hive -e "show create table ${db}.${i};" | grep "CREATE VIEW"` # This grep works here, as we do not want to match multiple-lines.
|
||||||
|
if [ -z "$create_view_statement" ]; then # If it's a table, then go load the data to it.
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "compute stats ${db}.${i}";
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# deleting the remaining directory from hdfs
|
if [ "${entities_on_impala[@]}" == "${entities_on_ocean[@]}" ]; then
|
||||||
hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -rm -R /tmp/$FILE/${db}.db
|
echo -e "\nAll entities have been copied to Impala cluster.\n"
|
||||||
|
else
|
||||||
|
echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n"
|
||||||
|
rm -f error.log
|
||||||
|
return 4
|
||||||
|
fi
|
||||||
|
|
||||||
|
rm -f error.log
|
||||||
|
echo -e "\n\nFinished processing db: ${db}\n\n"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MONITOR_DB=$1
|
MONITOR_DB=$1
|
||||||
#HADOOP_USER_NAME=$2
|
|
||||||
|
|
||||||
copydb $MONITOR_DB'_institutions'
|
copydb $MONITOR_DB'_institutions'
|
||||||
copydb $MONITOR_DB
|
copydb $MONITOR_DB
|
||||||
|
|
|
@ -6,9 +6,13 @@ then
|
||||||
ln -sfn ${PYTHON_EGG_CACHE}${link_folder} ${link_folder}
|
ln -sfn ${PYTHON_EGG_CACHE}${link_folder} ${link_folder}
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
||||||
|
# Set the active HDFS node of OCEAN and IMPALA cluster.
|
||||||
|
OCEAN_HDFS_NODE='hdfs://nameservice1'
|
||||||
|
echo -e "\nOCEAN HDFS virtual-name which resolves automatically to the active-node: ${OCEAN_HDFS_NODE}"
|
||||||
|
|
||||||
IMPALA_HDFS_NODE=''
|
IMPALA_HDFS_NODE=''
|
||||||
COUNTER=0
|
COUNTER=0
|
||||||
|
|
||||||
while [ $COUNTER -lt 3 ]; do
|
while [ $COUNTER -lt 3 ]; do
|
||||||
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
|
if hdfs dfs -test -e hdfs://impala-cluster-mn1.openaire.eu/tmp >/dev/null 2>&1; then
|
||||||
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
IMPALA_HDFS_NODE='hdfs://impala-cluster-mn1.openaire.eu:8020'
|
||||||
|
@ -22,76 +26,195 @@ while [ $COUNTER -lt 3 ]; do
|
||||||
fi
|
fi
|
||||||
((COUNTER++))
|
((COUNTER++))
|
||||||
done
|
done
|
||||||
|
|
||||||
if [ -z "$IMPALA_HDFS_NODE" ]; then
|
if [ -z "$IMPALA_HDFS_NODE" ]; then
|
||||||
echo -e "\n\nPROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! $COUNTER\n\n"
|
echo -e "\n\nERROR: PROBLEM WHEN SETTING THE HDFS-NODE FOR IMPALA CLUSTER! | AFTER ${COUNTER} RETRIES.\n\n"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
echo "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries."
|
echo -e "Active IMPALA HDFS Node: ${IMPALA_HDFS_NODE} , after ${COUNTER} retries.\n\n"
|
||||||
|
|
||||||
|
IMPALA_HOSTNAME='impala-cluster-dn1.openaire.eu'
|
||||||
|
IMPALA_CONFIG_FILE='/etc/impala_cluster/hdfs-site.xml'
|
||||||
|
|
||||||
|
IMPALA_HDFS_DB_BASE_PATH="${IMPALA_HDFS_NODE}/user/hive/warehouse"
|
||||||
|
|
||||||
|
# Set sed arguments.
|
||||||
|
LOCATION_HDFS_NODE_SED_ARG="s|${OCEAN_HDFS_NODE}|${IMPALA_HDFS_NODE}|g" # This requires to be used with "sed -e" in order to have the "|" delimiter (as the "/" conflicts with the URIs)
|
||||||
|
|
||||||
|
# Set the SED command arguments for column-names with reserved words:
|
||||||
|
DATE_SED_ARG_1='s/[[:space:]]\date[[:space:]]/\`date\`/g'
|
||||||
|
DATE_SED_ARG_2='s/\.date,/\.\`date\`,/g' # the "date" may be part of a larger field name like "datestamp" or "date_aggregated", so we need to be careful with what we are replacing.
|
||||||
|
DATE_SED_ARG_3='s/\.date[[:space:]]/\.\`date\` /g'
|
||||||
|
|
||||||
|
HASH_SED_ARG_1='s/[[:space:]]\hash[[:space:]]/\`hash\`/g'
|
||||||
|
HASH_SED_ARG_2='s/\.hash,/\.\`hash\`,/g'
|
||||||
|
HASH_SED_ARG_3='s/\.hash[[:space:]]/\.\`hash\` /g'
|
||||||
|
|
||||||
|
LOCATION_SED_ARG_1='s/[[:space:]]\location[[:space:]]/\`location\`/g'
|
||||||
|
LOCATION_SED_ARG_2='s/\.location,/\.\`location\`,/g'
|
||||||
|
LOCATION_SED_ARG_3='s/\.location[[:space:]]/\.\`location\` /g'
|
||||||
|
|
||||||
|
|
||||||
export HADOOP_USER_NAME=$6
|
export HADOOP_USER_NAME=$6
|
||||||
export PROD_USAGE_STATS_DB="openaire_prod_usage_stats"
|
export PROD_USAGE_STATS_DB="openaire_prod_usage_stats"
|
||||||
|
|
||||||
|
|
||||||
function copydb() {
|
function copydb() {
|
||||||
db=$1
|
db=$1
|
||||||
FILE=("hive_wf_tmp_"$RANDOM)
|
echo -e "\nStart processing db: '${db}'..\n"
|
||||||
hdfs dfs -mkdir ${IMPALA_HDFS_NODE}/tmp/$FILE/
|
|
||||||
# copy the databases from ocean to impala
|
|
||||||
|
|
||||||
echo "copying $db"
|
# Delete the old DB from Impala cluster (if exists).
|
||||||
hadoop distcp -Dmapreduce.map.memory.mb=6144 -pb hdfs://nameservice1/user/hive/warehouse/${db}.db ${IMPALA_HDFS_NODE}/tmp/$FILE/
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "drop database if exists ${db} cascade" |& tee error.log # impala-shell prints all logs in stderr, so wee need to capture them and put them in a file, in order to perform "grep" on them later
|
||||||
|
log_errors=`cat error.log | grep -E "WARN|ERROR|FAILED"`
|
||||||
|
if [ -n "$log_errors" ]; then
|
||||||
|
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN DROPPING THE OLD DATABASE! EXITING...\n\n"
|
||||||
|
rm -f error.log
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
|
||||||
# change ownership to impala
|
# Make Impala aware of the deletion of the old DB immediately.
|
||||||
hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -chmod -R 777 /tmp/$FILE/${db}.db
|
sleep 1
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
|
|
||||||
# drop tables from db
|
echo -e "\n\nCopying files of '${db}', from Ocean to Impala cluster..\n"
|
||||||
for i in `impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -d ${db} --delimited -q "show tables"`;
|
# Using max-bandwidth of: 50 * 100 Mb/s = 5 Gb/s
|
||||||
do
|
# Using max memory of: 50 * 6144 = 300 Gb
|
||||||
`impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "drop table $i;"`;
|
# Using 1MB as a buffer-size.
|
||||||
|
# The " -Ddistcp.dynamic.recordsPerChunk=50" arg is not available in our version of hadoop
|
||||||
|
# The "ug" args cannot be used as we get a "User does not belong to hive" error.
|
||||||
|
# The "p" argument cannot be used, as it blocks the files from being used, giving a "sticky bit"-error, even after applying chmod and chown onm the files.
|
||||||
|
hadoop distcp -Dmapreduce.map.memory.mb=6144 -m 70 -bandwidth 150 \
|
||||||
|
-numListstatusThreads 40 \
|
||||||
|
-copybuffersize 1048576 \
|
||||||
|
-strategy dynamic \
|
||||||
|
-pb \
|
||||||
|
${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH}
|
||||||
|
|
||||||
|
# Check the exit status of the "hadoop distcp" command.
|
||||||
|
if [ $? -eq 0 ]; then
|
||||||
|
echo -e "\nSuccessfully copied the files of '${db}'.\n"
|
||||||
|
else
|
||||||
|
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT WITH EXIT STATUS: $?\n\n"
|
||||||
|
rm -f error.log
|
||||||
|
return 2
|
||||||
|
fi
|
||||||
|
|
||||||
|
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the following costly operation as well..
|
||||||
|
#hdfs dfs -conf ${IMPALA_CONFIG_FILE} -chmod -R 777 ${TEMP_SUBDIR_FULLPATH}/${db}.db
|
||||||
|
|
||||||
|
echo -e "\nCreating schema for db: '${db}'\n"
|
||||||
|
|
||||||
|
# create the new database (with the same name)
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create database ${db}"
|
||||||
|
|
||||||
|
# Make Impala aware of the creation of the new DB immediately.
|
||||||
|
sleep 1
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
|
sleep 1
|
||||||
|
# Because "Hive" and "Impala" do not have compatible schemas, we cannot use the "show create table <name>" output from hive to create the exact same table in impala.
|
||||||
|
# So, we have to find at least one parquet file (check if it's there) from the table in the ocean cluster for impala to use it to extract the table-schema itself from that file.
|
||||||
|
|
||||||
|
all_create_view_statements=()
|
||||||
|
|
||||||
|
entities_on_ocean=`hive -e "show tables in ${db};" | sed 's/WARN:.*//g'` # Get the tables and views without any potential the "WARN" logs.
|
||||||
|
for i in ${entities_on_ocean[@]}; do # Use un-quoted values, as the elemetns are single-words.
|
||||||
|
# Check if this is a view by showing the create-statement where it should print "create view" for a view, not the "create table". Unfortunately, there is no "show views" command.
|
||||||
|
create_entity_statement=`hive -e "show create table ${db}.${i};"` # It needs to happen in two stages, otherwise the "grep" is not able to match multi-line statement.
|
||||||
|
|
||||||
|
create_view_statement_test=`echo -e "$create_entity_statement" | grep 'CREATE VIEW'`
|
||||||
|
if [ -n "$create_view_statement_test" ]; then
|
||||||
|
echo -e "\n'${i}' is a view, so we will save its 'create view' statement and execute it on Impala, after all tables have been created.\n"
|
||||||
|
create_view_statement=`echo -e "$create_entity_statement" | sed 's/WARN:.*//g' | sed 's/\`//g' \
|
||||||
|
| sed 's/"$/;/' | sed 's/^"//' | sed 's/\\"\\"/\"/g' | sed -e "${LOCATION_HDFS_NODE_SED_ARG}" | sed "${DATE_SED_ARG_1}" | sed "${HASH_SED_ARG_1}" | sed "${LOCATION_SED_ARG_1}" \
|
||||||
|
| sed "${DATE_SED_ARG_2}" | sed "${HASH_SED_ARG_2}" | sed "${LOCATION_SED_ARG_2}" \
|
||||||
|
| sed "${DATE_SED_ARG_3}" | sed "${HASH_SED_ARG_3}" | sed "${LOCATION_SED_ARG_3}"`
|
||||||
|
all_create_view_statements+=("$create_view_statement")
|
||||||
|
else
|
||||||
|
echo -e "\n'${i}' is a table, so we will check for its parquet files and create the table on Impala cluster.\n"
|
||||||
|
CURRENT_PRQ_FILE=`hdfs dfs -conf ${IMPALA_CONFIG_FILE} -ls -C "${IMPALA_HDFS_DB_BASE_PATH}/${db}.db/${i}/" | grep -v 'Found' | grep -v '_impala_insert_staging' | head -1`
|
||||||
|
if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside.
|
||||||
|
echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n"
|
||||||
|
else
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log
|
||||||
|
log_errors=`cat error.log | grep -E "WARN|ERROR|FAILED"`
|
||||||
|
if [ -n "$log_errors" ]; then
|
||||||
|
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN CREATING TABLE '${i}'!\n\n"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# drop views from db
|
echo -e "\nAll tables have been created, going to create the views..\n"
|
||||||
for i in `impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -d ${db} --delimited -q "show tables"`;
|
|
||||||
do
|
# Time to loop through the views and create them.
|
||||||
`impala-shell -i impala-cluster-dn1.openaire.eu -d ${db} -q "drop view $i;"`;
|
# At this point all table-schemas should have been created.
|
||||||
|
|
||||||
|
previous_num_of_views_to_retry=${#all_create_view_statements}
|
||||||
|
if [[ $previous_num_of_views_to_retry -gt 0 ]]; then
|
||||||
|
echo -e "\nAll_create_view_statements:\n\n${all_create_view_statements[@]}\n" # DEBUG
|
||||||
|
# Make Impala aware of the new tables, so it knows them when creating the views.
|
||||||
|
sleep 1
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
|
sleep 1
|
||||||
|
else
|
||||||
|
echo -e "\nDB '${db}' does not contain any views.\n"
|
||||||
|
fi
|
||||||
|
|
||||||
|
level_counter=0
|
||||||
|
while [[ ${#all_create_view_statements[@]} -gt 0 ]]; do
|
||||||
|
((level_counter++))
|
||||||
|
# The only accepted reason for a view to not be created, is if it depends on another view, which has not been created yet.
|
||||||
|
# In this case, we should retry creating this particular view again.
|
||||||
|
should_retry_create_view_statements=()
|
||||||
|
|
||||||
|
for create_view_statement in "${all_create_view_statements[@]}"; do # Here we use double quotes, as the elements are phrases, instead of single-words.
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "${create_view_statement}" |& tee error.log # impala-shell prints all logs in stderr, so wee need to capture them and put them in a file, in order to perform "grep" on them later
|
||||||
|
specific_errors=`cat error.log | grep -E "FAILED: ParseException line 1:13 missing TABLE at 'view'|ERROR: AnalysisException: Could not resolve table reference:"`
|
||||||
|
if [ -n "$specific_errors" ]; then
|
||||||
|
echo -e "\nspecific_errors: ${specific_errors}\n"
|
||||||
|
echo -e "\nView '$(cat error.log | grep "CREATE VIEW " | sed 's/CREATE VIEW //g' | sed 's/ as select .*//g')' failed to be created, possibly because it depends on another view.\n"
|
||||||
|
should_retry_create_view_statements+=("$create_view_statement")
|
||||||
|
else
|
||||||
|
sleep 1 # Wait a bit for Impala to register that the view was created, before possibly referencing it by another view.
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# delete the database
|
new_num_of_views_to_retry=${#should_retry_create_view_statements}
|
||||||
impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -q "drop database if exists ${db} cascade";
|
if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then
|
||||||
|
echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n"
|
||||||
# create the databases
|
return 3
|
||||||
impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -q "create database ${db}";
|
elif [[ $new_num_of_views_to_retry -gt 0 ]]; then
|
||||||
|
echo -e "\nTo be retried \"create_view_statements\":\n\n${should_retry_create_view_statements[@]}\n"
|
||||||
impala-shell --user $HADOOP_USER_NAME -q "INVALIDATE METADATA"
|
previous_num_of_views_to_retry=$new_num_of_views_to_retry
|
||||||
echo "creating schema for ${db}"
|
else
|
||||||
for (( k = 0; k < 5; k ++ )); do
|
echo -e "\nFinished creating views, for db: '${db}', in level-${level_counter}.\n"
|
||||||
for i in `impala-shell --user $HADOOP_USER_NAME -d ${db} --delimited -q "show tables"`;
|
fi
|
||||||
do
|
all_create_view_statements=("${should_retry_create_view_statement[@]}") # This is needed in any case to either move forward with the rest of the views or stop at 0 remaining views.
|
||||||
impala-shell --user $HADOOP_USER_NAME -d ${db} --delimited -q "show create table $i";
|
|
||||||
done | sed 's/"$/;/' | sed 's/^"//' | sed 's/[[:space:]]\date[[:space:]]/`date`/g' | impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -c -f -
|
|
||||||
done
|
done
|
||||||
|
|
||||||
# for i in `impala-shell --user $HADOOP_USER_NAME -d ${db} --delimited -q "show tables"`;
|
sleep 1
|
||||||
# do
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "INVALIDATE METADATA"
|
||||||
# impala-shell --user $HADOOP_USER_NAME -d ${db} --delimited -q "show create table $i";
|
sleep 1
|
||||||
# done | sed 's/"$/;/' | sed 's/^"//' | sed 's/[[:space:]]\date[[:space:]]/`date`/g' | impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -c -f -
|
|
||||||
#
|
|
||||||
# # run the same command twice because we may have failures in the first run (due to views pointing to the same db)
|
|
||||||
# for i in `impala-shell --user $HADOOP_USER_NAME -d ${db} --delimited -q "show tables"`;
|
|
||||||
# do
|
|
||||||
# impala-shell --user $HADOOP_USER_NAME -d ${db} --delimited -q "show create table $i";
|
|
||||||
# done | sed 's/"$/;/' | sed 's/^"//' | sed 's/[[:space:]]\date[[:space:]]/`date`/g' | impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -c -f -
|
|
||||||
|
|
||||||
# load the data from /tmp in the respective tables
|
echo -e "\nComputing stats for tables..\n"
|
||||||
echo "copying data in tables and computing stats"
|
entities_on_impala=`impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} --delimited -q "show tables in ${db}"`
|
||||||
for i in `impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -d ${db} --delimited -q "show tables"`;
|
for i in ${entities_on_impala[@]}; do # Use un-quoted values, as the elemetns are single-words.
|
||||||
do
|
# Taking the create table statement from the Ocean cluster, just to check if its a view, as the output is easier than using impala-shell from Impala cluster.
|
||||||
impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -d ${db} -q "load data inpath '/tmp/$FILE/${db}.db/$i' into table $i";
|
create_view_statement=`hive -e "show create table ${db}.${i};" | grep "CREATE VIEW"` # This grep works here, as we do not want to match multiple-lines.
|
||||||
impala-shell --user $HADOOP_USER_NAME -i impala-cluster-dn1.openaire.eu -d ${db} -q "compute stats $i";
|
if [ -z "$create_view_statement" ]; then # If it's a table, then go load the data to it.
|
||||||
|
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "compute stats ${db}.${i}";
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# deleting the remaining directory from hdfs
|
if [ "${entities_on_impala[@]}" == "${entities_on_ocean[@]}" ]; then
|
||||||
hdfs dfs -conf /etc/impala_cluster/hdfs-site.xml -rm -R /tmp/$FILE/${db}.db
|
echo -e "\nAll entities have been copied to Impala cluster.\n"
|
||||||
|
else
|
||||||
|
echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n"
|
||||||
|
rm -f error.log
|
||||||
|
return 4
|
||||||
|
fi
|
||||||
|
|
||||||
|
rm -f error.log
|
||||||
|
echo -e "\n\nFinished processing db: ${db}\n\n"
|
||||||
}
|
}
|
||||||
|
|
||||||
STATS_DB=$1
|
STATS_DB=$1
|
||||||
|
|
Loading…
Reference in New Issue