merging with branch beta

This commit is contained in:
Miriam Baglioni 2024-12-20 14:42:11 +01:00
commit 38572266d6
41 changed files with 1892 additions and 491 deletions

View File

@ -1,7 +1,6 @@
package eu.dnetlib.dhp.collection.plugin.gtr2; package eu.dnetlib.dhp.collection.plugin.gtr2;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
@ -11,12 +10,17 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.math.NumberUtils;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -28,8 +32,6 @@ import eu.dnetlib.dhp.common.collection.HttpConnector2;
public class Gtr2PublicationsIterator implements Iterator<String> { public class Gtr2PublicationsIterator implements Iterator<String> {
public static final int PAGE_SIZE = 20;
private static final Logger log = LoggerFactory.getLogger(Gtr2PublicationsIterator.class); private static final Logger log = LoggerFactory.getLogger(Gtr2PublicationsIterator.class);
private final HttpConnector2 connector; private final HttpConnector2 connector;
@ -42,7 +44,6 @@ public class Gtr2PublicationsIterator implements Iterator<String> {
private int endPage; private int endPage;
private boolean incremental = false; private boolean incremental = false;
private LocalDate fromDate; private LocalDate fromDate;
private final Map<String, String> cache = new HashMap<>(); private final Map<String, String> cache = new HashMap<>();
private final Queue<String> queue = new LinkedList<>(); private final Queue<String> queue = new LinkedList<>();
@ -88,7 +89,7 @@ public class Gtr2PublicationsIterator implements Iterator<String> {
private void prepareNextElement() { private void prepareNextElement() {
while ((this.currPage <= this.endPage) && this.queue.isEmpty()) { while ((this.currPage <= this.endPage) && this.queue.isEmpty()) {
log.debug("FETCHING PAGE + " + this.currPage + "/" + this.endPage); log.info("FETCHING PAGE + " + this.currPage + "/" + this.endPage);
this.queue.addAll(fetchPage(this.currPage++)); this.queue.addAll(fetchPage(this.currPage++));
} }
this.nextElement = this.queue.poll(); this.nextElement = this.queue.poll();
@ -97,18 +98,17 @@ public class Gtr2PublicationsIterator implements Iterator<String> {
private List<String> fetchPage(final int pageNumber) { private List<String> fetchPage(final int pageNumber) {
final List<String> res = new ArrayList<>(); final List<String> res = new ArrayList<>();
try {
final Document doc = loadURL(cleanURL(this.baseUrl + "/outcomes/publications?p=" + pageNumber), 0);
if (this.endPage == Integer.MAX_VALUE) { try {
this.endPage = NumberUtils.toInt(doc.valueOf("/*/@*[local-name() = 'totalPages']")); final Document doc = loadURL(this.baseUrl + "/publication?page=" + pageNumber, 0);
}
for (final Object po : doc.selectNodes("//*[local-name() = 'publication']")) { for (final Object po : doc.selectNodes("//*[local-name() = 'publication']")) {
final Element mainEntity = (Element) ((Element) po).detach(); final Element mainEntity = (Element) ((Element) po).detach();
if (filterIncremental(mainEntity)) { if (filterIncremental(mainEntity)) {
res.add(expandMainEntity(mainEntity)); final String publicationOverview = mainEntity.attributeValue("url");
res.add(loadURL(publicationOverview, -1).asXML());
} else { } else {
log.debug("Skipped entity"); log.debug("Skipped entity");
} }
@ -122,34 +122,6 @@ public class Gtr2PublicationsIterator implements Iterator<String> {
return res; return res;
} }
private void addLinkedEntities(final Element master, final String relType, final Element newRoot,
final Function<Document, Element> mapper) {
for (final Object o : master.selectNodes(".//*[local-name()='link']")) {
final String rel = ((Element) o).valueOf("@*[local-name()='rel']");
final String href = ((Element) o).valueOf("@*[local-name()='href']");
if (relType.equals(rel) && StringUtils.isNotBlank(href)) {
final String cacheKey = relType + "#" + href;
if (this.cache.containsKey(cacheKey)) {
try {
log.debug(" * from cache (" + relType + "): " + href);
newRoot.add(DocumentHelper.parseText(this.cache.get(cacheKey)).getRootElement());
} catch (final DocumentException e) {
log.error("Error retrieving cache element: " + cacheKey, e);
throw new RuntimeException("Error retrieving cache element: " + cacheKey, e);
}
} else {
final Document doc = loadURL(cleanURL(href), 0);
final Element elem = mapper.apply(doc);
newRoot.add(elem);
this.cache.put(cacheKey, elem.asXML());
}
}
}
}
private boolean filterIncremental(final Element e) { private boolean filterIncremental(final Element e) {
if (!this.incremental || isAfter(e.valueOf("@*[local-name() = 'created']"), this.fromDate) if (!this.incremental || isAfter(e.valueOf("@*[local-name() = 'created']"), this.fromDate)
|| isAfter(e.valueOf("@*[local-name() = 'updated']"), this.fromDate)) { || isAfter(e.valueOf("@*[local-name() = 'updated']"), this.fromDate)) {
@ -158,40 +130,34 @@ public class Gtr2PublicationsIterator implements Iterator<String> {
return false; return false;
} }
private String expandMainEntity(final Element mainEntity) {
final Element newRoot = DocumentHelper.createElement("doc");
newRoot.add(mainEntity);
addLinkedEntities(mainEntity, "PROJECT", newRoot, this::asProjectElement);
return DocumentHelper.createDocument(newRoot).asXML();
}
private Element asProjectElement(final Document doc) {
final Element newOrg = DocumentHelper.createElement("project");
newOrg.addElement("id").setText(doc.valueOf("/*/@*[local-name()='id']"));
newOrg
.addElement("code")
.setText(doc.valueOf("//*[local-name()='identifier' and @*[local-name()='type'] = 'RCUK']"));
newOrg.addElement("title").setText(doc.valueOf("//*[local-name()='title']"));
return newOrg;
}
private static String cleanURL(final String url) {
String cleaned = url;
if (cleaned.contains("gtr.gtr")) {
cleaned = cleaned.replace("gtr.gtr", "gtr");
}
if (cleaned.startsWith("http://")) {
cleaned = cleaned.replaceFirst("http://", "https://");
}
return cleaned;
}
private Document loadURL(final String cleanUrl, final int attempt) { private Document loadURL(final String cleanUrl, final int attempt) {
try { try (final CloseableHttpClient client = HttpClients.createDefault()) {
log.debug(" * Downloading Url: {}", cleanUrl);
final byte[] bytes = this.connector.getInputSource(cleanUrl).getBytes(StandardCharsets.UTF_8); final HttpGet req = new HttpGet(cleanUrl);
return DocumentHelper.parseText(new String(bytes)); req.setHeader(HttpHeaders.ACCEPT, "application/xml");
try (final CloseableHttpResponse response = client.execute(req)) {
if (endPage == Integer.MAX_VALUE)
for (final Header header : response.getAllHeaders()) {
log.debug("HEADER: " + header.getName() + " = " + header.getValue());
if ("Link-Pages".equals(header.getName())) {
if (Integer.parseInt(header.getValue()) < endPage)
endPage = Integer.parseInt(header.getValue());
}
}
final String content = IOUtils.toString(response.getEntity().getContent());
return DocumentHelper.parseText(content);
}
} catch (final Throwable e) { } catch (final Throwable e) {
if (attempt == -1)
try {
return DocumentHelper.parseText("<empty></empty>");
} catch (Throwable t) {
throw new RuntimeException();
}
log.error("Error dowloading url: {}, attempt = {}", cleanUrl, attempt, e); log.error("Error dowloading url: {}, attempt = {}", cleanUrl, attempt, e);
if (attempt >= MAX_ATTEMPTS) { if (attempt >= MAX_ATTEMPTS) {
throw new RuntimeException("Error downloading url: " + cleanUrl, e); throw new RuntimeException("Error downloading url: " + cleanUrl, e);

View File

@ -13,7 +13,7 @@ import eu.dnetlib.dhp.common.collection.HttpClientParams;
class Gtr2PublicationsIteratorTest { class Gtr2PublicationsIteratorTest {
private static final String baseURL = "https://gtr.ukri.org/gtr/api"; private static final String baseURL = "https://gtr.ukri.org/api";
private static final HttpClientParams clientParams = new HttpClientParams(); private static final HttpClientParams clientParams = new HttpClientParams();
@ -34,7 +34,7 @@ class Gtr2PublicationsIteratorTest {
@Test @Test
@Disabled @Disabled
public void testPaging() throws Exception { public void testPaging() throws Exception {
final Iterator<String> iterator = new Gtr2PublicationsIterator(baseURL, null, "2", "2", clientParams); final Iterator<String> iterator = new Gtr2PublicationsIterator(baseURL, null, "2", "3", clientParams);
while (iterator.hasNext()) { while (iterator.hasNext()) {
Thread.sleep(300); Thread.sleep(300);
@ -47,9 +47,9 @@ class Gtr2PublicationsIteratorTest {
@Test @Test
@Disabled @Disabled
public void testOnePage() throws Exception { public void testOnePage() throws Exception {
final Iterator<String> iterator = new Gtr2PublicationsIterator(baseURL, null, "12", "12", clientParams); final Iterator<String> iterator = new Gtr2PublicationsIterator(baseURL, null, "379", "380", clientParams);
final int count = iterateAndCount(iterator); final int count = iterateAndCount(iterator);
assertEquals(20, count); assertEquals(50, count);
} }
@Test @Test

View File

@ -8,6 +8,7 @@ import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.util.List; import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
/** /**
@ -37,15 +38,15 @@ public class QueryCommunityAPI {
} }
public static String community(String id, String baseURL) throws IOException { public static String subcommunities(String communityId, String baseURL) throws IOException {
return get(baseURL + id); return get(baseURL + communityId + "/subcommunities");
} }
public static String communityDatasource(String id, String baseURL) throws IOException { public static String communityDatasource(String id, String baseURL) throws IOException {
return get(baseURL + id + "/contentproviders"); return get(baseURL + id + "/datasources");
} }
@ -61,6 +62,10 @@ public class QueryCommunityAPI {
} }
public static String propagationOrganizationCommunityMap(String baseURL) throws IOException {
return get(StringUtils.substringBefore(baseURL, "community") + "propagationOrganizationCommunityMap");
}
@NotNull @NotNull
private static String getBody(HttpURLConnection conn) throws IOException { private static String getBody(HttpURLConnection conn) throws IOException {
String body = "{}"; String body = "{}";
@ -78,4 +83,24 @@ public class QueryCommunityAPI {
return body; return body;
} }
public static String subcommunityDatasource(String communityId, String subcommunityId, String baseURL)
throws IOException {
return get(baseURL + communityId + "/subcommunities/datasources?subCommunityId=" + subcommunityId);
}
public static String subcommunityPropagationOrganization(String communityId, String subcommunityId, String baseURL)
throws IOException {
return get(baseURL + communityId + "/subcommunities/propagationOrganizations?subCommunityId=" + subcommunityId);
}
public static String subcommunityProjects(String communityId, String subcommunityId, String page, String size,
String baseURL) throws IOException {
return get(
baseURL + communityId + "/subcommunities/projects/" + page + "/" + size + "?subCommunityId="
+ subcommunityId);
}
public static String propagationDatasourceCommunityMap(String baseURL) throws IOException {
return get(baseURL + "/propagationDatasourceCommunityMap");
}
} }

View File

@ -8,9 +8,8 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -33,73 +32,137 @@ public class Utils implements Serializable {
private static final ObjectMapper MAPPER = new ObjectMapper(); private static final ObjectMapper MAPPER = new ObjectMapper();
private static final VerbResolver resolver = VerbResolverFactory.newInstance(); private static final VerbResolver resolver = VerbResolverFactory.newInstance();
private static final Logger log = LoggerFactory.getLogger(Utils.class); @FunctionalInterface
private interface ProjectQueryFunction {
String query(int page, int size);
}
@FunctionalInterface
private interface DatasourceQueryFunction {
String query();
}
// PROJECT METHODS
public static CommunityEntityMap getProjectCommunityMap(String baseURL) throws IOException {
CommunityEntityMap projectMap = new CommunityEntityMap();
public static CommunityConfiguration getCommunityConfiguration(String baseURL) throws IOException {
final Map<String, Community> communities = Maps.newHashMap();
List<Community> validCommunities = new ArrayList<>();
getValidCommunities(baseURL) getValidCommunities(baseURL)
.forEach(community -> { .forEach(community -> {
addRelevantProjects(community.getId(), baseURL, projectMap);
try { try {
CommunityModel cm = MAPPER List<SubCommunityModel> subcommunities = getSubcommunities(community.getId(), baseURL);
.readValue(QueryCommunityAPI.community(community.getId(), baseURL), CommunityModel.class); subcommunities
validCommunities.add(getCommunity(cm)); .forEach(
sc -> addRelevantProjects(community.getId(), sc.getSubCommunityId(), baseURL, projectMap));
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
validCommunities.forEach(community -> { return projectMap;
}
private static void addRelevantProjects(
String communityId,
String baseURL,
CommunityEntityMap communityEntityMap) {
fetchAndProcessProjects(
(page, size) -> {
try {
return QueryCommunityAPI
.communityProjects(communityId, String.valueOf(page), String.valueOf(size), baseURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
},
communityId,
communityEntityMap);
}
private static void addRelevantProjects(
String communityId,
String subcommunityId,
String baseURL,
CommunityEntityMap communityEntityMap) {
fetchAndProcessProjects(
(page, size) -> {
try {
return QueryCommunityAPI
.subcommunityProjects(
communityId, subcommunityId, String.valueOf(page), String.valueOf(size), baseURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
},
communityId,
communityEntityMap);
}
private static void fetchAndProcessProjects(
ProjectQueryFunction projectQueryFunction,
String communityId,
CommunityEntityMap communityEntityMap) {
int page = 0;
final int size = 100;
ContentModel contentModel;
do {
try { try {
DatasourceList dl = MAPPER String response = projectQueryFunction.query(page, size);
.readValue( contentModel = MAPPER.readValue(response, ContentModel.class);
QueryCommunityAPI.communityDatasource(community.getId(), baseURL), DatasourceList.class);
community.setProviders(dl.stream().map(d -> { if (!contentModel.getContent().isEmpty()) {
if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled())) contentModel
return null; .getContent()
Provider p = new Provider(); .forEach(
p.setOpenaireId(ModelSupport.getIdPrefix(Datasource.class) + "|" + d.getOpenaireId()); project -> communityEntityMap
p.setSelectionConstraints(d.getSelectioncriteria()); .add(
if (p.getSelectionConstraints() != null) ModelSupport.getIdPrefix(Project.class) + "|" + project.getOpenaireId(),
p.getSelectionConstraints().setSelection(resolver); communityId));
return p; }
})
.filter(Objects::nonNull)
.collect(Collectors.toList()));
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException("Error processing projects for community: " + communityId, e);
} }
}); page++;
} while (!contentModel.getLast());
validCommunities.forEach(community -> {
if (community.isValid())
communities.put(community.getId(), community);
});
return new CommunityConfiguration(communities);
} }
private static Community getCommunity(CommunityModel cm) { private static List<Provider> getCommunityContentProviders(
Community c = new Community(); DatasourceQueryFunction datasourceQueryFunction) {
c.setId(cm.getId()); try {
c.setZenodoCommunities(cm.getOtherZenodoCommunities()); String response = datasourceQueryFunction.query();
if (StringUtils.isNotBlank(cm.getZenodoCommunity())) List<CommunityContentprovider> datasourceList = MAPPER
c.getZenodoCommunities().add(cm.getZenodoCommunity()); .readValue(response, new TypeReference<List<CommunityContentprovider>>() {
c.setSubjects(cm.getSubjects()); });
c.getSubjects().addAll(cm.getFos());
c.getSubjects().addAll(cm.getSdg()); return datasourceList.stream().map(d -> {
if (cm.getAdvancedConstraints() != null) { if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled()))
c.setConstraints(cm.getAdvancedConstraints()); return null;
c.getConstraints().setSelection(resolver); Provider p = new Provider();
p.setOpenaireId(ModelSupport.getIdPrefix(Datasource.class) + "|" + d.getOpenaireId());
p.setSelectionConstraints(d.getSelectioncriteria());
if (p.getSelectionConstraints() != null)
p.getSelectionConstraints().setSelection(resolver);
return p;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException("Error processing datasource information: " + e);
} }
if (cm.getRemoveConstraints() != null) {
c.setRemoveConstraints(cm.getRemoveConstraints());
c.getRemoveConstraints().setSelection(resolver);
}
return c;
} }
/**
* Select the communties with status different from hidden
* @param baseURL the base url of the API to be queried
* @return the list of communities in the CommunityModel class
* @throws IOException
*/
public static List<CommunityModel> getValidCommunities(String baseURL) throws IOException { public static List<CommunityModel> getValidCommunities(String baseURL) throws IOException {
return MAPPER List<CommunityModel> listCommunity = MAPPER
.readValue(QueryCommunityAPI.communities(baseURL), CommunitySummary.class) .readValue(QueryCommunityAPI.communities(baseURL), new TypeReference<List<CommunityModel>>() {
});
return listCommunity
.stream() .stream()
.filter( .filter(
community -> !community.getStatus().equals("hidden") && community -> !community.getStatus().equals("hidden") &&
@ -107,108 +170,217 @@ public class Utils implements Serializable {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
/**
* Sets the Community information from the replies of the communityAPIs
* @param baseURL the base url of the API to be queried
* @param communityModel the communityModel as replied by the APIs
* @return the community set with information from the community model and for the content providers
*/
private static Community getCommunity(String baseURL, CommunityModel communityModel) {
Community community = getCommunity(communityModel);
community.setProviders(getCommunityContentProviders(() -> {
try {
return QueryCommunityAPI.communityDatasource(community.getId(), baseURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
return community;
}
/**
* extends the community configuration for the subcommunity by adding the content providers
* @param baseURL
* @param communityId
* @param sc
* @return
*/
private static @NotNull Community getSubCommunityConfiguration(String baseURL, String communityId,
SubCommunityModel sc) {
Community c = getCommunity(sc);
c.setProviders(getCommunityContentProviders(() -> {
try {
return QueryCommunityAPI.subcommunityDatasource(communityId, sc.getSubCommunityId(), baseURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
return c;
}
/**
* Gets all the sub-comminities fir a given community identifier
* @param communityId
* @param baseURL
* @return
*/
private static List<Community> getSubCommunity(String communityId, String baseURL) {
try {
List<SubCommunityModel> subcommunities = getSubcommunities(communityId, baseURL);
return subcommunities
.stream()
.map(sc -> getSubCommunityConfiguration(baseURL, communityId, sc))
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* prepare the configuration for the communities and sub-communities
* @param baseURL
* @return
* @throws IOException
*/
public static CommunityConfiguration getCommunityConfiguration(String baseURL) throws IOException {
final Map<String, Community> communities = Maps.newHashMap();
List<CommunityModel> communityList = getValidCommunities(baseURL);
List<Community> validCommunities = new ArrayList<>();
communityList.forEach(community -> {
validCommunities.add(getCommunity(baseURL, community));
validCommunities.addAll(getSubCommunity(community.getId(), baseURL));
});
validCommunities.forEach(community -> {
if (community.isValid())
communities.put(community.getId(), community);
});
return new CommunityConfiguration(communities);
}
/**
* filles the common fields in the community model for both the communityconfiguration and the subcommunityconfiguration
* @param input
* @return
* @param <C>
*/
private static <C extends CommonConfigurationModel> Community getCommonConfiguration(C input) {
Community c = new Community();
c.setZenodoCommunities(input.getOtherZenodoCommunities());
if (StringUtils.isNotBlank(input.getZenodoCommunity()))
c.getZenodoCommunities().add(input.getZenodoCommunity());
c.setSubjects(input.getSubjects());
if (input.getFos() != null)
c.getSubjects().addAll(input.getFos());
if (input.getSdg() != null)
c.getSubjects().addAll(input.getSdg());
if (input.getAdvancedConstraints() != null) {
c.setConstraints(input.getAdvancedConstraints());
c.getConstraints().setSelection(resolver);
}
if (input.getRemoveConstraints() != null) {
c.setRemoveConstraints(input.getRemoveConstraints());
c.getRemoveConstraints().setSelection(resolver);
}
return c;
}
private static Community getCommunity(SubCommunityModel sc) {
Community c = getCommonConfiguration(sc);
c.setId(sc.getSubCommunityId());
return c;
}
private static Community getCommunity(CommunityModel cm) {
Community c = getCommonConfiguration(cm);
c.setId(cm.getId());
return c;
}
public static List<SubCommunityModel> getSubcommunities(String communityId, String baseURL) throws IOException {
return MAPPER
.readValue(
QueryCommunityAPI.subcommunities(communityId, baseURL), new TypeReference<List<SubCommunityModel>>() {
});
}
public static CommunityEntityMap getOrganizationCommunityMap(String baseURL) throws IOException {
return MAPPER
.readValue(QueryCommunityAPI.propagationOrganizationCommunityMap(baseURL), CommunityEntityMap.class);
}
public static CommunityEntityMap getDatasourceCommunityMap(String baseURL) throws IOException {
return MAPPER.readValue(QueryCommunityAPI.propagationDatasourceCommunityMap(baseURL), CommunityEntityMap.class);
}
private static void getRelatedOrganizations(String communityId, String baseURL,
CommunityEntityMap communityEntityMap) {
try {
List<String> associatedOrgs = MAPPER
.readValue(
QueryCommunityAPI.communityPropagationOrganization(communityId, baseURL),
EntityIdentifierList.class);
associatedOrgs
.forEach(
o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static void getRelatedOrganizations(String communityId, String subcommunityId, String baseURL,
CommunityEntityMap communityEntityMap) {
try {
List<String> associatedOrgs = MAPPER
.readValue(
QueryCommunityAPI.subcommunityPropagationOrganization(communityId, subcommunityId, baseURL),
EntityIdentifierList.class);
associatedOrgs
.forEach(
o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/** /**
* it returns for each organization the list of associated communities * it returns for each organization the list of associated communities
*/ */
public static CommunityEntityMap getCommunityOrganization(String baseURL) throws IOException { public static CommunityEntityMap getCommunityOrganization(String baseURL) throws IOException {
CommunityEntityMap organizationMap = new CommunityEntityMap(); CommunityEntityMap organizationMap = new CommunityEntityMap();
String entityPrefix = ModelSupport.getIdPrefix(Organization.class); List<CommunityModel> communityList = getValidCommunities(baseURL);
getValidCommunities(baseURL) communityList.forEach(community -> {
.forEach(community -> { getRelatedOrganizations(community.getId(), baseURL, organizationMap);
String id = community.getId();
try {
List<String> associatedOrgs = MAPPER
.readValue(
QueryCommunityAPI.communityPropagationOrganization(id, baseURL), OrganizationList.class);
associatedOrgs.forEach(o -> {
if (!organizationMap
.keySet()
.contains(
entityPrefix + "|" + o))
organizationMap.put(entityPrefix + "|" + o, new ArrayList<>());
organizationMap.get(entityPrefix + "|" + o).add(community.getId());
});
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return organizationMap;
}
public static CommunityEntityMap getCommunityProjects(String baseURL) throws IOException {
CommunityEntityMap projectMap = new CommunityEntityMap();
String entityPrefix = ModelSupport.getIdPrefix(Project.class);
getValidCommunities(baseURL)
.forEach(community -> {
int page = -1;
int size = 100;
ContentModel cm = new ContentModel();
do {
page++;
try {
cm = MAPPER
.readValue(
QueryCommunityAPI
.communityProjects(
community.getId(), String.valueOf(page), String.valueOf(size), baseURL),
ContentModel.class);
if (cm.getContent().size() > 0) {
cm.getContent().forEach(p -> {
if (!projectMap.keySet().contains(entityPrefix + "|" + p.getOpenaireId()))
projectMap.put(entityPrefix + "|" + p.getOpenaireId(), new ArrayList<>());
projectMap.get(entityPrefix + "|" + p.getOpenaireId()).add(community.getId());
});
}
} catch (IOException e) {
throw new RuntimeException(e);
}
} while (!cm.getLast());
});
return projectMap;
}
public static List<String> getCommunityIdList(String baseURL) throws IOException {
return getValidCommunities(baseURL)
.stream()
.map(CommunityModel::getId)
.collect(Collectors.toList());
}
public static List<EntityCommunities> getDatasourceCommunities(String baseURL) throws IOException {
List<CommunityModel> validCommunities = getValidCommunities(baseURL);
HashMap<String, Set<String>> map = new HashMap<>();
String entityPrefix = ModelSupport.getIdPrefix(Datasource.class) + "|";
validCommunities.forEach(c -> {
try { try {
new ObjectMapper() List<SubCommunityModel> subcommunities = getSubcommunities(community.getId(), baseURL);
.readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class) subcommunities
.forEach(d -> { .forEach(
if (!map.keySet().contains(d.getOpenaireId())) sc -> getRelatedOrganizations(
map.put(d.getOpenaireId(), new HashSet<>()); community.getId(), sc.getSubCommunityId(), baseURL, organizationMap));
map.get(d.getOpenaireId()).add(c.getId());
});
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
List<EntityCommunities> temp = map return organizationMap;
.keySet()
.stream()
.map(k -> EntityCommunities.newInstance(entityPrefix + k, getCollect(k, map)))
.collect(Collectors.toList());
return temp;
} }
@NotNull public static List<String> getCommunityIdList(String baseURL) throws IOException {
private static List<String> getCollect(String k, HashMap<String, Set<String>> map) { return getValidCommunities(baseURL)
List<String> temp = map.get(k).stream().collect(Collectors.toList()); .stream()
return temp; .flatMap(communityModel -> {
List<String> communityIds = new ArrayList<>();
communityIds.add(communityModel.getId());
try {
Utils
.getSubcommunities(communityModel.getId(), baseURL)
.forEach(sc -> communityIds.add(sc.getSubCommunityId()));
} catch (IOException e) {
throw new RuntimeException(e);
}
return communityIds.stream();
})
.collect(Collectors.toList());
} }
} }

View File

@ -0,0 +1,76 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
@JsonIgnoreProperties(ignoreUnknown = true)
public class CommonConfigurationModel implements Serializable {
private String zenodoCommunity;
private List<String> subjects;
private List<String> otherZenodoCommunities;
private List<String> fos;
private List<String> sdg;
private SelectionConstraints advancedConstraints;
private SelectionConstraints removeConstraints;
public String getZenodoCommunity() {
return zenodoCommunity;
}
public void setZenodoCommunity(String zenodoCommunity) {
this.zenodoCommunity = zenodoCommunity;
}
public List<String> getSubjects() {
return subjects;
}
public void setSubjects(List<String> subjects) {
this.subjects = subjects;
}
public List<String> getOtherZenodoCommunities() {
return otherZenodoCommunities;
}
public void setOtherZenodoCommunities(List<String> otherZenodoCommunities) {
this.otherZenodoCommunities = otherZenodoCommunities;
}
public List<String> getFos() {
return fos;
}
public void setFos(List<String> fos) {
this.fos = fos;
}
public List<String> getSdg() {
return sdg;
}
public void setSdg(List<String> sdg) {
this.sdg = sdg;
}
public SelectionConstraints getRemoveConstraints() {
return removeConstraints;
}
public void setRemoveConstraints(SelectionConstraints removeConstraints) {
this.removeConstraints = removeConstraints;
}
public SelectionConstraints getAdvancedConstraints() {
return advancedConstraints;
}
public void setAdvancedConstraints(SelectionConstraints advancedConstraints) {
this.advancedConstraints = advancedConstraints;
}
}

View File

@ -18,4 +18,12 @@ public class CommunityEntityMap extends HashMap<String, List<String>> {
} }
return super.get(key); return super.get(key);
} }
public void add(String key, String value) {
if (!super.containsKey(key)) {
super.put(key, new ArrayList<>());
}
super.get(key).add(value);
}
} }

View File

@ -13,75 +13,11 @@ import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
* @Date 06/10/23 * @Date 06/10/23
*/ */
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class CommunityModel implements Serializable { public class CommunityModel extends CommonConfigurationModel implements Serializable {
private String id; private String id;
private String type; private String type;
private String status; private String status;
private String zenodoCommunity;
private List<String> subjects;
private List<String> otherZenodoCommunities;
private List<String> fos;
private List<String> sdg;
private SelectionConstraints advancedConstraints;
private SelectionConstraints removeConstraints;
public String getZenodoCommunity() {
return zenodoCommunity;
}
public void setZenodoCommunity(String zenodoCommunity) {
this.zenodoCommunity = zenodoCommunity;
}
public List<String> getSubjects() {
return subjects;
}
public void setSubjects(List<String> subjects) {
this.subjects = subjects;
}
public List<String> getOtherZenodoCommunities() {
return otherZenodoCommunities;
}
public void setOtherZenodoCommunities(List<String> otherZenodoCommunities) {
this.otherZenodoCommunities = otherZenodoCommunities;
}
public List<String> getFos() {
return fos;
}
public void setFos(List<String> fos) {
this.fos = fos;
}
public List<String> getSdg() {
return sdg;
}
public void setSdg(List<String> sdg) {
this.sdg = sdg;
}
public SelectionConstraints getRemoveConstraints() {
return removeConstraints;
}
public void setRemoveConstraints(SelectionConstraints removeConstraints) {
this.removeConstraints = removeConstraints;
}
public SelectionConstraints getAdvancedConstraints() {
return advancedConstraints;
}
public void setAdvancedConstraints(SelectionConstraints advancedConstraints) {
this.advancedConstraints = advancedConstraints;
}
public String getId() { public String getId() {
return id; return id;
} }

View File

@ -1,15 +0,0 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
/**
* @author miriam.baglioni
* @Date 06/10/23
*/
public class CommunitySummary extends ArrayList<CommunityModel> implements Serializable {
public CommunitySummary() {
super();
}
}

View File

@ -1,13 +0,0 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
import eu.dnetlib.dhp.api.model.CommunityContentprovider;
public class DatasourceList extends ArrayList<CommunityContentprovider> implements Serializable {
public DatasourceList() {
super();
}
}

View File

@ -8,9 +8,9 @@ import java.util.ArrayList;
* @author miriam.baglioni * @author miriam.baglioni
* @Date 09/10/23 * @Date 09/10/23
*/ */
public class OrganizationList extends ArrayList<String> implements Serializable { public class EntityIdentifierList extends ArrayList<String> implements Serializable {
public OrganizationList() { public EntityIdentifierList() {
super(); super();
} }
} }

View File

@ -0,0 +1,19 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class SubCommunityModel extends CommonConfigurationModel implements Serializable {
private String subCommunityId;
public String getSubCommunityId() {
return subCommunityId;
}
public void setSubCommunityId(String subCommunityId) {
this.subCommunityId = subCommunityId;
}
}

View File

@ -15,10 +15,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -31,6 +29,8 @@ import eu.dnetlib.dhp.api.model.CommunityEntityMap;
import eu.dnetlib.dhp.api.model.EntityCommunities; import eu.dnetlib.dhp.api.model.EntityCommunities;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.bulktag.community.*; import eu.dnetlib.dhp.bulktag.community.*;
import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB;
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
@ -88,6 +88,14 @@ public class SparkBulkTagJob {
log.info("protoMap: {}", temp); log.info("protoMap: {}", temp);
ProtoMap protoMap = new Gson().fromJson(temp, ProtoMap.class); ProtoMap protoMap = new Gson().fromJson(temp, ProtoMap.class);
log.info("pathMap: {}", new Gson().toJson(protoMap)); log.info("pathMap: {}", new Gson().toJson(protoMap));
final String dbUrl = parser.get("dbUrl");
log.info("dbUrl: {}", dbUrl);
final String dbUser = parser.get("dbUser");
log.info("dbUser: {}", dbUser);
final String dbPassword = parser.get("dbPassword");
log.info("dbPassword: {}", dbPassword);
final String hdfsPath = outputPath + "masterDuplicate";
log.info("hdfsPath: {}", hdfsPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
CommunityConfiguration cc; CommunityConfiguration cc;
@ -101,7 +109,7 @@ public class SparkBulkTagJob {
cc = CommunityConfigurationFactory.newInstance(taggingConf); cc = CommunityConfigurationFactory.newInstance(taggingConf);
} else { } else {
cc = Utils.getCommunityConfiguration(baseURL); cc = Utils.getCommunityConfiguration(baseURL);
log.info(OBJECT_MAPPER.writeValueAsString(cc));
} }
runWithSparkSession( runWithSparkSession(
@ -109,20 +117,98 @@ public class SparkBulkTagJob {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
extendCommunityConfigurationForEOSC(spark, inputPath, cc); extendCommunityConfigurationForEOSC(spark, inputPath, cc);
ReadDatasourceMasterDuplicateFromDB.execute(dbUrl, dbUser, dbPassword, hdfsPath, hdfsNameNode);
execBulkTag( execBulkTag(
spark, inputPath, outputPath, protoMap, cc); spark, inputPath, outputPath, protoMap, cc);
execEntityTag( execEntityTag(
spark, inputPath + "organization", outputPath + "organization", spark, inputPath + "organization", outputPath + "organization",
Utils.getCommunityOrganization(baseURL), Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION, mapWithRepresentativeOrganization(
spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)),
Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION,
TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION); TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION);
execEntityTag( execEntityTag(
spark, inputPath + "project", outputPath + "project", Utils.getCommunityProjects(baseURL), spark, inputPath + "project", outputPath + "project",
Utils.getProjectCommunityMap(baseURL),
Project.class, TaggingConstants.CLASS_ID_PROJECT, TaggingConstants.CLASS_NAME_BULKTAG_PROJECT); Project.class, TaggingConstants.CLASS_ID_PROJECT, TaggingConstants.CLASS_NAME_BULKTAG_PROJECT);
execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL)); execEntityTag(
spark, inputPath + "datasource", outputPath + "datasource",
mapWithMasterDatasource(spark, hdfsPath, Utils.getDatasourceCommunityMap(baseURL)),
Datasource.class, TaggingConstants.CLASS_ID_DATASOURCE,
TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE);
}); });
} }
private static CommunityEntityMap mapWithMasterDatasource(SparkSession spark, String masterDuplicatePath,
CommunityEntityMap datasourceCommunityMap) {
// load master-duplicate relations
Dataset<MasterDuplicate> masterDuplicate = spark
.read()
.schema(Encoders.bean(MasterDuplicate.class).schema())
.json(masterDuplicatePath)
.as(Encoders.bean(MasterDuplicate.class));
// list of id for the communities related entities
List<String> idList = entityIdList(ModelSupport.idPrefixMap.get(Datasource.class), datasourceCommunityMap);
// find the mapping with the representative entity if any
Dataset<String> datasourceIdentifiers = spark.createDataset(idList, Encoders.STRING());
List<Row> mappedKeys = datasourceIdentifiers
.join(
masterDuplicate, datasourceIdentifiers.col("_1").equalTo(masterDuplicate.col("duplicateId")),
"left_semi")
.selectExpr("masterId as source", "duplicateId as target")
.collectAsList();
// remap the entity with its corresponding representative
return remapCommunityEntityMap(datasourceCommunityMap, mappedKeys);
}
private static List<String> entityIdList(String idPrefixMap, CommunityEntityMap datasourceCommunityMap) {
final String prefix = idPrefixMap + "|";
return datasourceCommunityMap
.keySet()
.stream()
.map(key -> prefix + key)
.collect(Collectors.toList());
}
private static CommunityEntityMap mapWithRepresentativeOrganization(SparkSession spark, String relationPath,
CommunityEntityMap organizationCommunityMap) {
Dataset<Row> mergesRel = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(relationPath)
.filter("datainfo.deletedbyinference != true and relClass = 'merges")
.select("source", "target");
List<String> idList = entityIdList(ModelSupport.idPrefixMap.get(Organization.class), organizationCommunityMap);
Dataset<String> organizationIdentifiers = spark.createDataset(idList, Encoders.STRING());
List<Row> mappedKeys = organizationIdentifiers
.join(mergesRel, organizationIdentifiers.col("_1").equalTo(mergesRel.col("target")), "left_semi")
.select("source", "target")
.collectAsList();
return remapCommunityEntityMap(organizationCommunityMap, mappedKeys);
}
private static CommunityEntityMap remapCommunityEntityMap(CommunityEntityMap entityCommunityMap,
List<Row> mappedKeys) {
for (Row mappedEntry : mappedKeys) {
String oldKey = mappedEntry.getAs("target");
String newKey = mappedEntry.getAs("source");
// inserts the newKey in the map while removing the oldKey. The remove produces the value in the Map, which
// will be used as the newValue parameter of the BiFunction
entityCommunityMap.merge(newKey, entityCommunityMap.remove(oldKey), (existing, newValue) -> {
existing.addAll(newValue);
return existing;
});
}
return entityCommunityMap;
}
private static <E extends OafEntity> void execEntityTag(SparkSession spark, String inputPath, String outputPath, private static <E extends OafEntity> void execEntityTag(SparkSession spark, String inputPath, String outputPath,
CommunityEntityMap communityEntity, Class<E> entityClass, CommunityEntityMap communityEntity, Class<E> entityClass,
String classID, String calssName) { String classID, String calssName) {
@ -184,63 +270,6 @@ public class SparkBulkTagJob {
.json(inputPath); .json(inputPath);
} }
private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath,
List<EntityCommunities> datasourceCommunities) {
Dataset<Datasource> datasource = readPath(spark, inputPath + "datasource", Datasource.class);
Dataset<EntityCommunities> dc = spark
.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class));
datasource
.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left")
.map((MapFunction<Tuple2<Datasource, EntityCommunities>, Datasource>) t2 -> {
Datasource ds = t2._1();
if (t2._2() != null) {
List<String> context = Optional
.ofNullable(ds.getContext())
.map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList()))
.orElse(new ArrayList<>());
if (!Optional.ofNullable(ds.getContext()).isPresent())
ds.setContext(new ArrayList<>());
t2._2().getCommunitiesId().forEach(c -> {
if (!context.contains(c)) {
Context con = new Context();
con.setId(c);
con
.setDataInfo(
Arrays
.asList(
OafMapperUtils
.dataInfo(
false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
TaggingConstants.CLASS_ID_DATASOURCE,
TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"1")));
ds.getContext().add(con);
}
});
}
return ds;
}, Encoders.bean(Datasource.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "datasource");
readPath(spark, outputPath + "datasource", Datasource.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + "datasource");
}
private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath, private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath,
CommunityConfiguration cc) { CommunityConfiguration cc) {
@ -278,11 +307,6 @@ public class SparkBulkTagJob {
ProtoMap protoMappingParams, ProtoMap protoMappingParams,
CommunityConfiguration communityConfiguration) { CommunityConfiguration communityConfiguration) {
try {
System.out.println(new ObjectMapper().writeValueAsString(protoMappingParams));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
ModelSupport.entityTypes ModelSupport.entityTypes
.keySet() .keySet()
.parallelStream() .parallelStream()

View File

@ -43,7 +43,8 @@ public class Community implements Serializable {
} }
public void setSubjects(List<String> subjects) { public void setSubjects(List<String> subjects) {
this.subjects = subjects; if (subjects != null)
this.subjects = subjects;
} }
public List<Provider> getProviders() { public List<Provider> getProviders() {
@ -59,7 +60,8 @@ public class Community implements Serializable {
} }
public void setZenodoCommunities(List<String> zenodoCommunities) { public void setZenodoCommunities(List<String> zenodoCommunities) {
this.zenodoCommunities = zenodoCommunities; if (zenodoCommunities != null)
this.zenodoCommunities = zenodoCommunities;
} }
public SelectionConstraints getConstraints() { public SelectionConstraints getConstraints() {

View File

@ -12,10 +12,7 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.*;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -84,10 +81,12 @@ public class SparkCountryPropagationJob {
Dataset<R> res = readPath(spark, sourcePath, resultClazz); Dataset<R> res = readPath(spark, sourcePath, resultClazz);
log.info("Reading prepared info: {}", preparedInfoPath); log.info("Reading prepared info: {}", preparedInfoPath);
Encoder<ResultCountrySet> rcsEncoder = Encoders.bean(ResultCountrySet.class);
Dataset<ResultCountrySet> prepared = spark Dataset<ResultCountrySet> prepared = spark
.read() .read()
.schema(rcsEncoder.schema())
.json(preparedInfoPath) .json(preparedInfoPath)
.as(Encoders.bean(ResultCountrySet.class)); .as(rcsEncoder);
res res
.joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer") .joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer")

View File

@ -52,6 +52,7 @@ public class PrepareResultCommunitySet {
log.info("baseURL: {}", baseURL); log.info("baseURL: {}", baseURL);
final CommunityEntityMap organizationMap = Utils.getCommunityOrganization(baseURL); final CommunityEntityMap organizationMap = Utils.getCommunityOrganization(baseURL);
// final CommunityEntityMap organizationMap = Utils.getOrganizationCommunityMap(baseURL);
log.info("organizationMap: {}", new Gson().toJson(organizationMap)); log.info("organizationMap: {}", new Gson().toJson(organizationMap));
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();

View File

@ -2,13 +2,11 @@
package eu.dnetlib.dhp.resulttocommunityfromproject; package eu.dnetlib.dhp.resulttocommunityfromproject;
import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*; import java.util.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.MapGroupsFunction;
@ -18,16 +16,10 @@ import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils; import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.CommunityEntityMap; import eu.dnetlib.dhp.api.model.CommunityEntityMap;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultOrganizations;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
public class PrepareResultCommunitySet { public class PrepareResultCommunitySet {
@ -55,7 +47,7 @@ public class PrepareResultCommunitySet {
final String baseURL = parser.get("baseURL"); final String baseURL = parser.get("baseURL");
log.info("baseURL: {}", baseURL); log.info("baseURL: {}", baseURL);
final CommunityEntityMap projectsMap = Utils.getCommunityProjects(baseURL); final CommunityEntityMap projectsMap = Utils.getProjectCommunityMap(baseURL);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();

View File

@ -28,4 +28,7 @@ blacklist=empty
allowedpids=orcid;orcid_pending allowedpids=orcid;orcid_pending
baseURL = https://services.openaire.eu/openaire/community/ baseURL = https://services.openaire.eu/openaire/community/
iterations=1 iterations=1
dbUrl=jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus
dbUser=dnet
dbPassword=dnetPwd

View File

@ -170,6 +170,18 @@
<name>pathMap</name> <name>pathMap</name>
<value>${pathMap}</value> <value>${pathMap}</value>
</property> </property>
<property>
<name>dbUrl</name>
<value>${dbUrl}</value>
</property>
<property>
<name>dbUser</name>
<value>${dbUser}</value>
</property>
<property>
<name>dbPassword</name>
<value>${dbPassword}</value>
</property>
</configuration> </configuration>
</sub-workflow> </sub-workflow>
<ok to="affiliation_inst_repo" /> <ok to="affiliation_inst_repo" />

View File

@ -39,5 +39,22 @@
"paramLongName": "nameNode", "paramLongName": "nameNode",
"paramDescription": "this parameter is to specify the api to be queried (beta or production)", "paramDescription": "this parameter is to specify the api to be queried (beta or production)",
"paramRequired": true "paramRequired": true
},
{
"paramName": "du",
"paramLongName": "dbUrl",
"paramDescription": "this parameter is to specify the api to be queried (beta or production)",
"paramRequired": true
},{
"paramName": "dus",
"paramLongName": "dbUser",
"paramDescription": "this parameter is to specify the api to be queried (beta or production)",
"paramRequired": true
},{
"paramName": "dp",
"paramLongName": "dbPassword",
"paramDescription": "this parameter is to specify the api to be queried (beta or production)",
"paramRequired": true
} }
] ]

View File

@ -16,6 +16,18 @@
<name>startFrom></name> <name>startFrom></name>
<value>undelete</value> <value>undelete</value>
</property> </property>
<property>
<name>dbUrl</name>
</property>
<property>
<name>dbUser</name>
</property>
<property>
<name>dbPassword</name>
</property>
</parameters> </parameters>
@ -77,6 +89,9 @@
<arg>--pathMap</arg><arg>${pathMap}</arg> <arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--baseURL</arg><arg>${baseURL}</arg> <arg>--baseURL</arg><arg>${baseURL}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg> <arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbUser</arg><arg>${dbUser}</arg>
<arg>--dbPassword</arg><arg>${dbPassword}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -6,7 +6,6 @@ import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
@ -18,7 +17,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -29,9 +27,13 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson; import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.SubCommunityModel;
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
import eu.dnetlib.dhp.bulktag.community.ProtoMap; import eu.dnetlib.dhp.bulktag.community.ProtoMap;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
@ -1949,4 +1951,21 @@ public class BulkTagJobTest {
} }
@Test
public void testApi() throws IOException {
String baseURL = "https://dev-openaire.d4science.org/openaire/community/";
List<SubCommunityModel> subcommunities = Utils.getSubcommunities("clarin", baseURL);
CommunityConfiguration tmp = Utils.getCommunityConfiguration(baseURL);
tmp.getCommunities().keySet().forEach(c -> {
try {
System.out.println(new ObjectMapper().writeValueAsString(tmp.getCommunities().get(c)));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
System.out.println(new ObjectMapper().writeValueAsString(Utils.getOrganizationCommunityMap(baseURL)));
}
} }

View File

@ -0,0 +1,80 @@
package eu.dnetlib.dhp.oa.graph.hive;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Oaf;
public class GraphHiveTableExporterJob {
private static final Logger log = LoggerFactory.getLogger(GraphHiveTableExporterJob.class);
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
GraphHiveTableExporterJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/hive_db_exporter_parameters.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(-1);
log.info("numPartitions: {}", numPartitions);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String hiveTableName = parser.get("hiveTableName");
log.info("hiveTableName: {}", hiveTableName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
String mode = parser.get("mode");
log.info("mode: {}", mode);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
runWithSparkHiveSession(
conf, isSparkSessionManaged,
spark -> saveGraphTable(spark, outputPath, hiveTableName, mode, numPartitions));
}
// protected for testing
private static <T extends Oaf> void saveGraphTable(SparkSession spark, String outputPath, String hiveTableName,
String mode, int numPartitions) {
Dataset<Row> dataset = spark.table(hiveTableName);
if (numPartitions > 0) {
log.info("repartitioning to {} partitions", numPartitions);
dataset = dataset.repartition(numPartitions);
}
dataset
.write()
.mode(mode)
.option("compression", "gzip")
.json(outputPath);
}
}

View File

@ -0,0 +1,32 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path to the graph data dump to read",
"paramRequired": true
},
{
"paramName": "mode",
"paramLongName": "mode",
"paramDescription": "mode (append|overwrite)",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "hiveTableName",
"paramDescription": "the input hive table identifier",
"paramRequired": true
}
]

View File

@ -1,12 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw package eu.dnetlib.dhp.oa.graph.raw
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.common.ModelSupport import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Encoders, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.{SparkConf, SparkContext}
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
@ -54,48 +52,60 @@ object CopyHdfsOafSparkApplication {
val hdfsPath = parser.get("hdfsPath") val hdfsPath = parser.get("hdfsPath")
log.info("hdfsPath: {}", hdfsPath) log.info("hdfsPath: {}", hdfsPath)
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
val paths = val paths =
DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala
val validPaths: List[String] = val validPaths: List[String] =
paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList
val types = ModelSupport.oafTypes.entrySet.asScala
.map(e => Tuple2(e.getKey, e.getValue))
if (validPaths.nonEmpty) { if (validPaths.nonEmpty) {
val oaf = spark.read.textFile(validPaths: _*) val oaf = spark.read
val mapper = .textFile(validPaths: _*)
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .map(v => (getOafType(v), v))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
.cache()
types.foreach(t => try {
oaf ModelSupport.oafTypes
.filter(o => isOafType(o, t._1)) .keySet()
.map(j => mapper.readValue(j, t._2).asInstanceOf[Oaf]) .asScala
.map(s => mapper.writeValueAsString(s))(Encoders.STRING) .foreach(entity =>
.write oaf
.option("compression", "gzip") .filter(s"_1 = '${entity}'")
.mode(SaveMode.Append) .selectExpr("_2")
.text(s"$hdfsPath/${t._1}") .write
) .option("compression", "gzip")
.mode(SaveMode.Append)
.text(s"$hdfsPath/${entity}")
)
} finally {
oaf.unpersist()
}
} }
} }
def isOafType(input: String, oafType: String): Boolean = { def getOafType(input: String): String = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(input) lazy val json: org.json4s.JValue = parse(input)
if (oafType == "relation") {
val hasSource = (json \ "source").extractOrElse[String](null)
val hasTarget = (json \ "target").extractOrElse[String](null)
hasSource != null && hasTarget != null val hasId = (json \ "id").extractOrElse[String](null)
val hasSource = (json \ "source").extractOrElse[String](null)
val hasTarget = (json \ "target").extractOrElse[String](null)
if (hasId == null && hasSource != null && hasTarget != null) {
"relation"
} else if (hasId != null) {
val oafType: String = ModelSupport.idPrefixEntity.get(hasId.substring(0, 2))
oafType match {
case "result" =>
(json \ "resulttype" \ "classid").extractOrElse[String](null) match {
case "other" => "otherresearchproduct"
case any => any
}
case _ => oafType
}
} else { } else {
val hasId = (json \ "id").extractOrElse[String](null) null
val resultType = (json \ "resulttype" \ "classid").extractOrElse[String]("")
hasId != null && oafType.startsWith(resultType)
} }
} }
} }

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static org.junit.jupiter.api.Assertions.assertFalse; import static eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication.getOafType;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException; import java.io.IOException;
@ -11,67 +11,24 @@ import org.junit.jupiter.api.Test;
public class CopyHdfsOafSparkApplicationTest { public class CopyHdfsOafSparkApplicationTest {
String getResourceAsStream(String path) throws IOException {
return IOUtils.toString(getClass().getResourceAsStream(path));
}
@Test @Test
void testIsOafType() throws IOException { void testIsOafType() throws IOException {
assertTrue( assertEquals("publication", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")));
CopyHdfsOafSparkApplication assertEquals("dataset", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")));
.isOafType( assertEquals("relation", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/relation_1.json")));
IOUtils assertEquals("publication", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")));
.toString( assertEquals(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")), "publication",
"publication")); getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_2_unknownProperty.json")));
assertTrue(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")),
"dataset"));
assertTrue(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/relation_1.json")),
"relation"));
assertFalse(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")),
"dataset"));
assertFalse(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")),
"publication"));
assertTrue(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass()
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/raw/publication_2_unknownProperty.json")),
"publication"));
} }
@Test @Test
void isOafType_Datacite_ORP() throws IOException { void isOafType_Datacite_ORP() throws IOException {
assertTrue( assertEquals(
CopyHdfsOafSparkApplication "otherresearchproduct", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/datacite_orp.json")));
.isOafType(
IOUtils
.toString(
getClass()
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/raw/datacite_orp.json")),
"otherresearchproduct"));
} }
} }

View File

@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
<version>1.2.5-SNAPSHOT</version>
</parent>
<artifactId>dhp-incremental-graph</artifactId>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${net.alchim31.maven.version}</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>initialize</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>scala-doc</id>
<phase>process-resources</phase> <!-- or wherever -->
<goals>
<goal>doc</goal>
</goals>
</execution>
</executions>
<configuration>
<failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
<scalaCompatVersion>${scala.binary.version}</scalaCompatVersion>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-aggregation</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-enrichment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-graph-mapper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,161 @@
package eu.dnetlib.dhp.incremental;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static org.apache.spark.sql.functions.udf;
import java.util.Collections;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.rest.DNetRestClient;
import eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication;
import eu.dnetlib.dhp.oozie.RunSQLSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import scala.collection.JavaConversions;
public class CollectNewOafResults {
private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);
private final ArgumentApplicationParser parser;
public CollectNewOafResults(ArgumentApplicationParser parser) {
this.parser = parser;
}
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CollectNewOafResults.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String wrkdirPath = parser.get("workingDir");
log.info("workingDir is {}", wrkdirPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath is {}", outputPath);
final String mdStoreManagerURI = parser.get("mdStoreManagerURI");
log.info("mdStoreManagerURI is {}", mdStoreManagerURI);
final String mdStoreID = parser.get("mdStoreID");
if (StringUtils.isBlank(mdStoreID)) {
throw new IllegalArgumentException("missing or empty argument mdStoreID");
}
final String hiveDbName = parser.get("hiveDbName");
log.info("hiveDbName is {}", hiveDbName);
final MDStoreVersion currentVersion = DNetRestClient
.doGET(String.format(MDStoreActionNode.READ_LOCK_URL, mdStoreManagerURI, mdStoreID), MDStoreVersion.class);
log.info("mdstore data is {}", currentVersion.toString());
try {
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"));
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
// ids in the current graph
Dataset<Row> currentIds = spark
.table(hiveDbName + ".result")
.select("id")
.union(
spark
.table(hiveDbName + ".relation")
.where("relClass = 'merges'")
.selectExpr("target as id"))
.distinct();
UserDefinedFunction getOafType = udf(
(String json) -> CopyHdfsOafSparkApplication.getOafType(json), DataTypes.StringType);
// new collected ids
spark
.read()
.text(currentVersion.getHdfsPath() + "/store")
.selectExpr(
"value",
"get_json_object(value, '$.id') AS id")
.where("id IS NOT NULL")
.join(currentIds, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left_anti")
.withColumn("oaftype", getOafType.apply(new Column("value")))
.write()
.partitionBy("oaftype")
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.parquet(wrkdirPath + "/entities");
ModelSupport.oafTypes
.keySet()
.forEach(
entity -> spark
.read()
.parquet(wrkdirPath + "/entities")
.filter("oaftype = '" + entity + "'")
.select("value")
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(outputPath + "/" + entity));
Dataset<Row> newIds = spark.read().parquet(wrkdirPath + "/entities").select("id");
Dataset<Row> rels = spark
.read()
.text(currentVersion.getHdfsPath() + "/store")
.selectExpr(
"value",
"get_json_object(value, '$.source') AS source",
"get_json_object(value, '$.target') AS target")
.where("source IS NOT NULL AND target IS NOT NULL");
rels
.join(
newIds.selectExpr("id as source"),
JavaConversions.asScalaBuffer(Collections.singletonList("source")), "left_semi")
.union(
rels
.join(
newIds.selectExpr("id as target"),
JavaConversions.asScalaBuffer(Collections.singletonList("target")), "left_semi"))
.distinct()
.select("value")
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(outputPath + "/relation");
});
} finally {
DNetRestClient
.doGET(String.format(MDStoreActionNode.READ_UNLOCK_URL, mdStoreManagerURI, currentVersion.getId()));
}
}
}

View File

@ -0,0 +1,96 @@
package eu.dnetlib.dhp.incremental
import eu.dnetlib.dhp.PropagationConstant
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.bulktag.community.TaggingConstants
import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter, seqAsJavaListConverter}
object SparkAppendContextCleanedGraph {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(
IOUtils.toString(
getClass.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json"
)
)
)
parser.parseArgument(args)
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"))
val graphBasePath = parser.get("graphBasePath")
log.info(s"graphBasePath -> $graphBasePath")
val relationPath = parser.get("relationPath")
log.info(s"relationPath -> $relationPath")
val targetPath = parser.get("targetGraph")
log.info(s"targetGraph -> $targetPath")
val hiveDbName = parser.get("hiveDbName")
log.info(s"hiveDbName -> $hiveDbName")
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.enableHiveSupport()
.appName(getClass.getSimpleName)
.getOrCreate()
for ((entity, clazz) <- ModelSupport.oafTypes.asScala) {
if (classOf[OafEntity].isAssignableFrom(clazz)) {
val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]]
spark
.table(s"${hiveDbName}.${entity}")
.as(classEnc)
.map(e => {
val oaf = e.asInstanceOf[OafEntity]
if (oaf.getContext != null) {
val newContext = oaf.getContext.asScala
.map(c => {
if (c.getDataInfo != null) {
c.setDataInfo(
c.getDataInfo.asScala
.filter(
di =>
!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE)
&& !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE)
)
.toList
.asJava
)
}
c
})
.filter(!_.getDataInfo.isEmpty)
.toList
.asJava
oaf.setContext(newContext)
}
e
})(classEnc)
.write
.option("compression", "gzip")
.mode(SaveMode.Append)
.json(s"$targetPath/${entity}")
} else {
spark
.table(s"${hiveDbName}.${entity}")
.write
.option("compression", "gzip")
.mode(SaveMode.Append)
.json(s"$targetPath/${entity}")
}
}
}
}

View File

@ -0,0 +1,89 @@
package eu.dnetlib.dhp.incremental
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, expr}
import org.slf4j.{Logger, LoggerFactory}
object SparkResolveRelationById {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(
IOUtils.toString(
getClass.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json"
)
)
)
parser.parseArgument(args)
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"))
val graphBasePath = parser.get("graphBasePath")
log.info(s"graphBasePath -> $graphBasePath")
val relationPath = parser.get("relationPath")
log.info(s"relationPath -> $relationPath")
val targetPath = parser.get("targetGraph")
log.info(s"targetGraph -> $targetPath")
val hiveDbName = parser.get("hiveDbName")
log.info(s"hiveDbName -> $hiveDbName")
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.enableHiveSupport()
.appName(getClass.getSimpleName)
.getOrCreate()
implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
val mergedrels =
spark.table(s"${hiveDbName}.relation").where("relclass = 'merges'").selectExpr("source as dedupId", "target as mergedId")
spark.read
.schema(Encoders.bean(classOf[Relation]).schema)
.json(s"$graphBasePath/relation")
.as[Relation]
.map(r => resolveRelations(r))
.join(mergedrels, col("source") === mergedrels.col("mergedId"), "left")
.withColumn("source", expr("coalesce(dedupId, source)"))
.drop("mergedId", "dedupID")
.join(mergedrels, col("target") === mergedrels.col("mergedId"), "left")
.withColumn("target", expr("coalesce(dedupId, target)"))
.drop("mergedId", "dedupID")
.write
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(s"$targetPath/relation")
}
private def resolveRelations(r: Relation): Relation = {
if (r.getSource.startsWith("unresolved::"))
r.setSource(resolvePid(r.getSource.substring(12)))
if (r.getTarget.startsWith("unresolved::"))
r.setTarget(resolvePid(r.getTarget.substring(12)))
r
}
private def resolvePid(str: String): String = {
val parts = str.split("::")
val id = parts(0)
val scheme: String = parts.last match {
case "arxiv" => "arXiv"
case _ => parts.last
}
IdentifierFactory.idFromPid("50", scheme, id, true)
}
}

View File

@ -0,0 +1,44 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "mu",
"paramLongName": "mdStoreManagerURI",
"paramDescription": "the MDStore Manager URI",
"paramRequired": true
},
{
"paramName": "mi",
"paramLongName": "mdStoreID",
"paramDescription": "the Metadata Store ID",
"paramRequired": false
},
{
"paramName": "wd",
"paramLongName": "workingDir",
"paramDescription": "the path to store the output graph",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "hiveDbName",
"paramDescription": "the graph hive database name",
"paramRequired": true
}
]

View File

@ -0,0 +1,23 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
</configuration>

View File

@ -0,0 +1,77 @@
<workflow-app name="NewResultsCollect_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>mdStoreManagerURI</name>
<description>the path of the cleaned mdstore</description>
</property>
<property>
<name>mdStoreID</name>
<description>the identifier of the native MDStore</description>
</property>
<property>
<name>outputPath</name>
<description>outputDirectory</description>
</property>
<property>
<name>workingDir</name>
<description>outputDirectory</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>hiveDbName</name>
<description>hive database containing last generated graph</description>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=8G --conf spark.executor.memoryOverhead=6G --executor-cores=6 --driver-memory=9G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=1024</value>
<description>spark resource options</description>
</property>
</parameters>
<start to="CollectJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="CollectJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Collect New Oaf Results</name>
<class>eu.dnetlib.dhp.incremental.CollectNewOafResults</class>
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/collect_new_results</arg>
<arg>--mdStoreID</arg><arg>${mdStoreID}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,26 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
</configuration>

View File

@ -0,0 +1,158 @@
<workflow-app name="import_graph_as_hive_DB" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>outputPath</name>
<description>the source path</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=1024</value>
<description>spark resource options</description>
</property>
</parameters>
<start to="fork_export"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<fork name="fork_export">
<path start="export_publication"/>
<path start="export_dataset"/>
<path start="export_otherresearchproduct"/>
<path start="export_software"/>
<path start="export_relation"/>
</fork>
<action name="export_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge table publication</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--mode</arg><arg>append</arg>
<arg>--hiveTableName</arg><arg>${hiveDbName}.publication</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_export"/>
<error to="Kill"/>
</action>
<action name="export_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge table dataset</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--mode</arg><arg>append</arg>
<arg>--hiveTableName</arg><arg>${hiveDbName}.dataset</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_export"/>
<error to="Kill"/>
</action>
<action name="export_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge table otherresearchproduct</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--mode</arg><arg>append</arg>
<arg>--hiveTableName</arg><arg>${hiveDbName}.otherresearchproduct</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_export"/>
<error to="Kill"/>
</action>
<action name="export_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge table software</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--mode</arg><arg>append</arg>
<arg>--hiveTableName</arg><arg>${hiveDbName}.software</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_export"/>
<error to="Kill"/>
</action>
<action name="export_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge table relation</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/relation</arg>
<arg>--mode</arg><arg>append</arg>
<arg>--hiveTableName</arg><arg>${hiveDbName}.relation</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_export"/>
<error to="Kill"/>
</action>
<join name="join_export" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,26 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>/user/hive/warehouse</value>
</property>
</configuration>

View File

@ -0,0 +1,11 @@
INSERT OVERWRITE DIRECTORY '${outputPath}/datasource'
USING json OPTIONS ('compression' 'gzip')
SELECT * FROM `${hiveDbName}`.`datasource`; /* EOS */
INSERT OVERWRITE DIRECTORY '${outputPath}/organization'
USING json OPTIONS ('compression' 'gzip')
SELECT * FROM `${hiveDbName}`.`organization`; /* EOS */
INSERT OVERWRITE DIRECTORY '${outputPath}/project'
USING json OPTIONS ('compression' 'gzip')
SELECT * FROM `${hiveDbName}`.`project`; /* EOS */

View File

@ -0,0 +1,69 @@
<workflow-app name="NewResultsCollect_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>outputPath</name>
<description>outputDirectory</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
</property>
<property>
<name>hiveDbName</name>
<description>hive database containing last generated graph</description>
</property>
<property>
<name>action</name>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=1024</value>
<description>spark resource options</description>
</property>
</parameters>
<start to="MigrationJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="MigrationJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Copy data from last graph</name>
<class>eu.dnetlib.dhp.oozie.RunSQLSparkJob</class>
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/${action}.sql</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,23 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
</configuration>

View File

@ -0,0 +1,151 @@
<workflow-app name="Transformation_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>input graph to resolve</description>
</property>
<property>
<name>targetGraph</name>
<description>outputDirectory</description>
</property>
<property>
<name>workingDir</name>
<description>outputDirectory</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>hiveDbName</name>
<description>hive database containing last generated graph</description>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=1024</value>
<description>spark resource options</description>
</property>
</parameters>
<start to="ResolveJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResolveJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Resolve Relations</name>
<class>eu.dnetlib.dhp.incremental.SparkResolveRelationById</class>
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--targetGraph</arg><arg>${targetGraph}</arg>
<arg>--workingDir</arg><arg>${workingDir}/resolve_relation</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
</spark>
<ok to="reset_outputpath"/>
<error to="Kill"/>
</action>
<action name="reset_outputpath">
<fs>
<delete path="${targetGraph}/dataset"/>
<delete path="${targetGraph}/datasource"/>
<delete path="${targetGraph}/organization"/>
<delete path="${targetGraph}/otherresearchproduct"/>
<delete path="${targetGraph}/person"/>
<delete path="${targetGraph}/project"/>
<delete path="${targetGraph}/publication"/>
<delete path="${targetGraph}/software"/>
</fs>
<ok to="copy_dataset"/>
<error to="Kill"/>
</action>
<action name="copy_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/dataset</arg>
<arg>${nameNode}/${targetGraph}/dataset</arg>
</distcp>
<ok to="copy_datasource"/>
<error to="Kill"/>
</action>
<action name="copy_datasource">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/datasource</arg>
<arg>${nameNode}/${targetGraph}/datasource</arg>
</distcp>
<ok to="copy_organization"/>
<error to="Kill"/>
</action>
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/organization</arg>
<arg>${nameNode}/${targetGraph}/organization</arg>
</distcp>
<ok to="copy_otherresearchproduct"/>
<error to="Kill"/>
</action>
<action name="copy_otherresearchproduct">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/otherresearchproduct</arg>
<arg>${nameNode}/${targetGraph}/otherresearchproduct</arg>
</distcp>
<ok to="copy_person"/>
<error to="Kill"/>
</action>
<action name="copy_person">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/person</arg>
<arg>${nameNode}/${targetGraph}/person</arg>
</distcp>
<ok to="copy_project"/>
<error to="Kill"/>
</action>
<action name="copy_project">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/project</arg>
<arg>${nameNode}/${targetGraph}/project</arg>
</distcp>
<ok to="copy_publication"/>
<error to="Kill"/>
</action>
<action name="copy_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/publication</arg>
<arg>${nameNode}/${targetGraph}/publication</arg>
</distcp>
<ok to="copy_software"/>
<error to="Kill"/>
</action>
<action name="copy_software">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/software</arg>
<arg>${nameNode}/${targetGraph}/software</arg>
</distcp>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,32 @@
[
{
"paramName": "g",
"paramLongName": "graphBasePath",
"paramDescription": "the path of the raw graph",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "targetGraph",
"paramDescription": "the target path",
"paramRequired": true
},
{
"paramName": "wd",
"paramLongName": "workingDir",
"paramDescription": "the path to store the output graph",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "hiveDbName",
"paramDescription": "the graph hive database name",
"paramRequired": true
}
]

View File

@ -43,6 +43,7 @@
<module>dhp-doiboost</module> <module>dhp-doiboost</module>
<module>dhp-impact-indicators</module> <module>dhp-impact-indicators</module>
<module>dhp-swh</module> <module>dhp-swh</module>
<module>dhp-incremental-graph</module>
</modules> </modules>
<pluginRepositories> <pluginRepositories>