forked from antonis.lempesis/dnet-hadoop
[ActionManagement] reduced number of xqueries used to access ActionSet info
This commit is contained in:
parent
7dc824fc23
commit
25254885b9
|
@ -3,20 +3,23 @@ package eu.dnetlib.dhp.actionmanager;
|
|||
|
||||
import java.io.Serializable;
|
||||
import java.io.StringReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.dom4j.Element;
|
||||
import org.dom4j.io.SAXReader;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import eu.dnetlib.actionmanager.rmi.ActionManagerException;
|
||||
import eu.dnetlib.actionmanager.set.ActionManagerSet;
|
||||
|
@ -25,6 +28,7 @@ import eu.dnetlib.dhp.actionmanager.partition.PartitionActionSetsByPayloadTypeJo
|
|||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class ISClient implements Serializable {
|
||||
|
||||
|
@ -40,80 +44,52 @@ public class ISClient implements Serializable {
|
|||
|
||||
public List<String> getLatestRawsetPaths(String setIds) {
|
||||
|
||||
List<String> ids = Lists
|
||||
.newArrayList(
|
||||
final Set<String> ids = Sets
|
||||
.newHashSet(
|
||||
Splitter
|
||||
.on(INPUT_ACTION_SET_ID_SEPARATOR)
|
||||
.omitEmptyStrings()
|
||||
.trimResults()
|
||||
.split(setIds));
|
||||
|
||||
return ids
|
||||
.stream()
|
||||
.map(id -> getSet(isLookup, id))
|
||||
.map(as -> as.getPathToLatest())
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
private ActionManagerSet getSet(ISLookUpService isLookup, final String setId) {
|
||||
|
||||
final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') "
|
||||
+ "where $x//SET/@id = '"
|
||||
+ setId
|
||||
+ "' return $x";
|
||||
|
||||
try {
|
||||
final String basePath = getBasePathHDFS(isLookup);
|
||||
final String setProfile = isLookup.getResourceProfileByQuery(q);
|
||||
return getActionManagerSet(basePath, setProfile);
|
||||
} catch (ISLookUpException | ActionManagerException e) {
|
||||
throw new RuntimeException("Error accessing Sets, using query: " + q);
|
||||
|
||||
// <SET id="..." directory="..." latest="xxx"/>
|
||||
final String xquery = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') "
|
||||
+
|
||||
"return <SET id='{$x//SET/@id/string()}' directory='{$x//SET/@directory/string()}' latest='{$x//LATEST/@id/string()}'/>";
|
||||
return Optional
|
||||
.ofNullable(isLookup.quickSearchProfile(xquery))
|
||||
.map(
|
||||
sets -> sets
|
||||
.stream()
|
||||
.map(set -> parseSetInfo(set))
|
||||
.filter(t -> ids.contains(t.getLeft()))
|
||||
.map(t -> buildDirectory(basePath, t))
|
||||
.collect(Collectors.toList()))
|
||||
.orElseThrow(() -> new IllegalStateException("empty set list"));
|
||||
} catch (ActionManagerException | ISLookUpException e) {
|
||||
throw new IllegalStateException("unable to query ActionSets info from the IS");
|
||||
}
|
||||
}
|
||||
|
||||
private ActionManagerSet getActionManagerSet(final String basePath, final String profile)
|
||||
throws ActionManagerException {
|
||||
final SAXReader reader = new SAXReader();
|
||||
final ActionManagerSet set = new ActionManagerSet();
|
||||
|
||||
private Triple<String, String, String> parseSetInfo(String set) {
|
||||
try {
|
||||
final Document doc = reader.read(new StringReader(profile));
|
||||
|
||||
set.setId(doc.valueOf("//SET/@id").trim());
|
||||
set.setName(doc.valueOf("//SET").trim());
|
||||
set.setImpact(ImpactTypes.valueOf(doc.valueOf("//IMPACT").trim()));
|
||||
set
|
||||
.setLatest(
|
||||
doc.valueOf("//RAW_SETS/LATEST/@id"),
|
||||
doc.valueOf("//RAW_SETS/LATEST/@creationDate"),
|
||||
doc.valueOf("//RAW_SETS/LATEST/@lastUpdate"));
|
||||
set.setDirectory(doc.valueOf("//SET/@directory"));
|
||||
final List expiredNodes = doc.selectNodes("//RAW_SETS/EXPIRED");
|
||||
if (expiredNodes != null) {
|
||||
for (int i = 0; i < expiredNodes.size(); i++) {
|
||||
Element ex = (Element) expiredNodes.get(i);
|
||||
set
|
||||
.addExpired(
|
||||
ex.attributeValue("id"),
|
||||
ex.attributeValue("creationDate"),
|
||||
ex.attributeValue("lastUpdate"));
|
||||
}
|
||||
}
|
||||
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append(basePath);
|
||||
sb.append("/");
|
||||
sb.append(doc.valueOf("//SET/@directory"));
|
||||
sb.append("/");
|
||||
sb.append(doc.valueOf("//RAW_SETS/LATEST/@id"));
|
||||
set.setPathToLatest(sb.toString());
|
||||
|
||||
return set;
|
||||
} catch (Exception e) {
|
||||
throw new ActionManagerException("Error creating set from profile: " + profile, e);
|
||||
Document doc = new SAXReader().read(new StringReader(set));
|
||||
return Triple
|
||||
.of(
|
||||
doc.valueOf("//SET/@id"),
|
||||
doc.valueOf("//SET/@directory"),
|
||||
doc.valueOf("//SET/@latest"));
|
||||
} catch (DocumentException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private String buildDirectory(String basePath, Triple<String, String, String> t) {
|
||||
return Joiner.on("/").join(basePath, t.getMiddle(), t.getRight());
|
||||
}
|
||||
|
||||
private String getBasePathHDFS(ISLookUpService isLookup) throws ActionManagerException {
|
||||
return queryServiceProperty(isLookup, "basePath");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue