new node and workflow to index on ES

This commit is contained in:
Enrico Ottonello 2020-06-16 02:36:16 +02:00
parent 42c3f5e885
commit db9b70feb0
16 changed files with 424 additions and 248 deletions
dnet-ariadneplus-graphdb-publisher
dnet-ariadneplus/src/main
java/eu/dnetlib/ariadneplus/workflows/nodes
resources/eu/dnetlib

View File

@ -33,6 +33,14 @@ public class BulkUpload {
private RestHighLevelClient client;
public void init(String elasticSearchHostName, String elasticSearchIndexName) throws IOException {
this.elasticSearchIndexName = elasticSearchIndexName;
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost(elasticSearchHostName,9200,"http")));
}
@PostConstruct
public void init() throws IOException {
client = new RestHighLevelClient(

View File

@ -0,0 +1,52 @@
package eu.dnetlib.ariadneplus.elasticsearch.model;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import java.util.Map;
public class ArchaeologicalResourceType {
private long id = 100000;
private String name;
public ArchaeologicalResourceType() {
}
// public static ArchaeologicalResourceType fromRDFJson(JsonElement json) {
// ArchaeologicalResourceType art = new ArchaeologicalResourceType();
// for (Map.Entry<String, JsonElement> entry : json.getAsJsonObject().entrySet()){
// switch (entry.getKey()){
// case "https://www.ariadne-infrastructure.eu/property/id" :
// art.setId(entry.getValue().getAsJsonArray().get(0).getAsJsonObject().get("value").getAsString());
// break;
// case "https://www.ariadne-infrastructure.eu/property/name":
// String tmp = entry.getValue().getAsJsonArray().get(0).getAsJsonObject().get("value").getAsString();
// art.setName(tmp);
// break;
//
// }
// }
//
// return art;
// }
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public static ArchaeologicalResourceType fromJson(String json){
return new Gson().fromJson(json, ArchaeologicalResourceType.class);
}
}

View File

@ -1,52 +0,0 @@
package eu.dnetlib.ariadneplus.elasticsearch.model;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import java.util.Map;
public class ArcheologicalResourceType {
private String id;
private String name;
public ArcheologicalResourceType() {
}
public static ArcheologicalResourceType fromRDFJson(JsonElement json) {
ArcheologicalResourceType art = new ArcheologicalResourceType();
for (Map.Entry<String, JsonElement> entry : json.getAsJsonObject().entrySet()){
switch (entry.getKey()){
case "https://www.ariadne-infrastructure.eu/property/id" :
art.setId(entry.getValue().getAsJsonArray().get(0).getAsJsonObject().get("value").getAsString());
break;
case "https://www.ariadne-infrastructure.eu/property/name":
String tmp = entry.getValue().getAsJsonArray().get(0).getAsJsonObject().get("value").getAsString();
art.setName(tmp);
break;
}
}
return art;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public static ArcheologicalResourceType fromJson(String json){
return new Gson().fromJson(json, ArcheologicalResourceType.class);
}
}

View File

@ -9,7 +9,7 @@ public class AriadneCatalogEntry {
private List<AatSubject> aatSubjects;
private String accessPolicy;
private String accessRights;
private ArcheologicalResourceType archeologicalResourceType;
private ArchaeologicalResourceType archaeologicalResourceType;
private String contactPoint;
private List<AgentInfo> contributor;
private List<AgentInfo> creator;
@ -80,12 +80,12 @@ public class AriadneCatalogEntry {
this.accessRights = accessRights;
}
public ArcheologicalResourceType getArcheologicalResourceType() {
return archeologicalResourceType;
public ArchaeologicalResourceType getArchaeologicalResourceType() {
return archaeologicalResourceType;
}
public void setArcheologicalResourceType(ArcheologicalResourceType archeologicalResourceType) {
this.archeologicalResourceType = archeologicalResourceType;
public void setArchaeologicalResourceType(ArchaeologicalResourceType archaeologicalResourceType) {
this.archaeologicalResourceType = archaeologicalResourceType;
}
public String getContactPoint() {

View File

@ -354,16 +354,18 @@ public class GraphDBClient {
runSPQRLQuery.setParser(parseRDFJSON);
runSPQRLQuery.setResourceManager(resourceManager);
runSPQRLQuery.setBulkUpload(bulkUpload);
String recordId = "<https://ariadne-infrastructure.eu/aocat/Resource/02E4F4B5-24B7-3AD7-B460-CFA8B1F0BD1F>";
List<String> recordIds = Arrays.asList(recordId);
// List<String> recordIds = runSPQRLQuery.selectRecordIds(datasource, collectionId);
// String recordId = "https://ariadne-infrastructure.eu/aocat/Resource/02E4F4B5-24B7-3AD7-B460-CFA8B1F0BD1F";
// List<String> recordIds = Arrays.asList(recordId);
List<String> recordIds = runSPQRLQuery.selectRecordIds(datasource, collectionId);
final ClassPathResource queryTemplateResource = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_record_data_template.sparql");
String queryTemplate = IOUtils.toString(queryTemplateResource.getInputStream(), StandardCharsets.UTF_8.name());
runSPQRLQuery.executeMultipleQueryGraph(queryTemplate, recordIds, datasource, collectionId);
// List<String> collectionResourceId = runSPQRLQuery.selectCollectionId(datasource, collectionId);
// final ClassPathResource selectCollectionTemplateRes = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_collection_data_template.sparql");
// String selectCollectionTemplate = IOUtils.toString(selectCollectionTemplateRes.getInputStream(), StandardCharsets.UTF_8.name());
// runSPQRLQuery.executeMultipleQueryGraph(selectCollectionTemplate, collectionResourceId, datasource, collectionId);
boolean isCollection = false;
runSPQRLQuery.executeMultipleQueryGraph(queryTemplate, recordIds, datasource, collectionId, isCollection);
List<String> collectionResourceId = runSPQRLQuery.selectCollectionId(datasource, collectionId);
final ClassPathResource selectCollectionTemplateRes = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_collection_data_template.sparql");
String selectCollectionTemplate = IOUtils.toString(selectCollectionTemplateRes.getInputStream(), StandardCharsets.UTF_8.name());
isCollection = true;
runSPQRLQuery.executeMultipleQueryGraph(selectCollectionTemplate, collectionResourceId, datasource, collectionId, isCollection);
}catch(Throwable e){
log.error(e);
throw new AriadnePlusPublisherException(e);

View File

@ -49,8 +49,8 @@ public class ResourceManager {
this.type_path = type_path;
this.general_classpath = general_classpath;
this.exclude_predicates = exclude_predicates;
propertiesMap = new PropertiesMap();
propertiesMap.fill(spec);
this.spec = spec;
init();
}
@PostConstruct
@ -87,7 +87,7 @@ public class ResourceManager {
if(entry instanceof LinkedHashMap){
LinkedHashMap tmp = (LinkedHashMap)((JSONArray)((LinkedHashMap)entry).get(type_path)).get(0);
class_name = (String)tmp.get("value");
if (class_name.equals("provided record")) {
if (class_name.equals("provided record") || class_name.equals("Dataset Collection")) {
class_name = "AriadneCatalogEntry";
}
}

View File

@ -59,19 +59,19 @@ public class RunSPARQLQueryService {
manager.shutDown();
}
public String executeMultipleQueryGraph(String queryTemplate, List<String> recordIds, String datasource, String collectionId){
public String executeMultipleQueryGraph(String queryTemplate, List<String> recordIds, String datasource, String collectionId, boolean isCollection){
if (queryTemplate==null)
return null;
final String selectQueryTemplate = queryTemplate.replaceAll("%datasource", datasource).replaceAll("%collectionId", collectionId);
recordIds.forEach(recordId -> {
executeQueryGraph(selectQueryTemplate, recordId);
executeQueryGraph(selectQueryTemplate, recordId, isCollection);
});
return "ok";
}
private String executeQueryGraph(String selectQueryTemplate, String recordId){
log.debug("Retrieving "+recordId+" ...");
String query = selectQueryTemplate.replaceAll("%record", recordId);
private String executeQueryGraph(String selectQueryTemplate, String recordId, boolean isCollection){
log.debug("Retrieving "+recordId+" - isCollection:"+isCollection );
String query = selectQueryTemplate.replaceAll("%record", "<"+recordId+">");
openConnection();
StringWriter recordWriter = null;
Model resultsModel = null;
@ -90,6 +90,9 @@ public class RunSPARQLQueryService {
recordWriter = new StringWriter();
RDFWriter rdfRecordWriter = Rio.createWriter(RDFFormat.RDFJSON, recordWriter);
Rio.write(resultsModel, rdfRecordWriter);
if (isCollection) {
parser.setCollection(true);
}
parser.parse(recordWriter.toString());
resourceManager.manage(parser);
bulkUpload.index(resourceManager);

View File

@ -3,6 +3,9 @@ package eu.dnetlib.ariadneplus.reader.json;
import java.util.Iterator;
import java.util.LinkedHashMap;
import eu.dnetlib.ariadneplus.reader.RunSPARQLQueryService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@ -18,11 +21,18 @@ import net.minidev.json.parser.ParseException;
@Service
public class ParseRDFJSON {
private static final Log log = LogFactory.getLog(ParseRDFJSON.class);
static JSONObject map ;
@Value("${catalog.entry.path}")
private String catalogEntryJsonPath;
@Value("${catalog.entry.collection.path}")
private String catalogEntryCollectionJsonPath;
private boolean isCollection = false;
private String json;
private Iterator<Object> it ;
@ -45,8 +55,10 @@ public class ParseRDFJSON {
setJson(json);
fillMap();
DocumentContext jsonContext = JsonPath.parse(json);
log.debug(getCatalogEntryJsonPath());
JSONArray entries = jsonContext.read(getCatalogEntryJsonPath());
int size = entries.size();
log.debug("num elements in json: "+size);
it = entries.iterator();
}
@ -63,10 +75,21 @@ public class ParseRDFJSON {
}
public String getCatalogEntryJsonPath() {
if (isCollection) {
return catalogEntryCollectionJsonPath;
}
return catalogEntryJsonPath;
}
public void setCatalogEntryJsonPath(String catalogEntryJsonPath) {
this.catalogEntryJsonPath = catalogEntryJsonPath;
}
public boolean isCollection() {
return isCollection;
}
public void setCollection(boolean collection) {
isCollection = collection;
}
}

View File

@ -5,7 +5,7 @@ server.port=8281
graphdb.serverUrl=http://graphdb-test.ariadne.d4science.org:7200
graphdb.writer.user=writer
graphdb.writer.pwd=********
graphdb.writer.pwd=*****
graphdb.repository=ariadneplus-ts01
graphdb.baseURI=https://ariadne-infrastructure.eu/
@ -13,6 +13,7 @@ elasticsearch.hostname=elastic-test.ariadne.d4science.org
elasticsearch.indexname=catalog_test
catalog.entry.path=$[*][?(@['https://www.ariadne-infrastructure.eu/property/resourceType'][0]['value']=='provided record')]
catalog.entry.collection.path=$[*][?(@['https://www.ariadne-infrastructure.eu/property/resourceType'][0]['value']=='Dataset Collection')]
general.classpath=eu.dnetlib.ariadneplus.elasticsearch.model.
type.path=https://www.ariadne-infrastructure.eu/property/resourceType
exclude.predicates=["https://www.ariadne-infrastructure.eu/property/resourceType", "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"]
@ -95,7 +96,7 @@ class.map.specifications={\
"element_type": "java.lang.String"\
}}\
},\
"ArcheologicalResourceType": {\
"ArchaeologicalResourceType": {\
"class_type": "unique",\
"mappings": {\
"https://www.ariadne-infrastructure.eu/property/id": {\
@ -204,8 +205,8 @@ class.map.specifications={\
"substring": "no"\
},\
"https://www.ariadne-infrastructure.eu/property/archeologicalResourceType": {\
"class_field": "ArcheologicalResourceType",\
"external_reference": "ArcheologicalResourceType",\
"class_field": "ArchaeologicalResourceType",\
"external_reference": "ArchaeologicalResourceType",\
"substring": "no"\
},\
"https://www.ariadne-infrastructure.eu/property/issued": {\

View File

@ -1,164 +0,0 @@
PREFIX aocat: <https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX aoprop: <https://www.ariadne-infrastructure.eu/property/>
PREFIX onto: <http://www.ontotext.com/>
PREFIX ariadneplus: <https://ariadne-infrastructure.eu/aocat/>
PREFIX time: <http://www.w3.org/2006/time#>
CONSTRUCT {
?record aoprop:identifier ?record .
?record aoprop:originalId ?originalId .
?record aoprop:issued ?issued .
?record aoprop:modified ?modified .
?record aoprop:partOf ?partOf .
?record aoprop:creator ?creator .
?creator aoprop:name ?creatorName .
?creator aoprop:email ?creatorEmail .
?record aoprop:contributor ?contributor .
?contributor aoprop:name ?contributorName .
?contributor aoprop:email ?contributorEmail .
?record aoprop:legalResponsible ?legalResponsible .
?legalResponsible aoprop:name ?legalResponsibleName .
?legalResponsible aoprop:email ?legalResponsibleEmail .
?record aoprop:owner ?owner .
?owner aoprop:name ?ownerName .
?owner aoprop:email ?ownerEmail .
?record aoprop:publisher ?publisher .
?publisher aoprop:name ?publisherName .
?publisher aoprop:email ?publisherEmail .
?record aoprop:accessPolicy ?accessPolicy .
?record aoprop:accessRights ?accessRights .
?record aoprop:landingPage ?landingPage .
?record aoprop:spatialRegion ?spatialRegion .
?spatialRegion aoprop:placeName ?spatialPlaceName .
?spatialRegion aoprop:spatialCoordinateSystem ?spatialCoordinateSystem .
?record aoprop:spatialRegionPoint ?spatialRegionPoint .
?spatialRegionPoint aoprop:lat ?spatialLocationLat .
?spatialRegionPoint aoprop:lon ?spatialLocationLon .
?record aoprop:spatialRegionBox ?spatialRegionBox .
?spatialRegionBox aoprop:boxMaxLat ?spatialLocationBBMaxLat .
?spatialRegionBox aoprop:boxMaxLon ?spatialLocationBBMaxLon .
?spatialRegionBox aoprop:boxMinLat ?spatialLocationBBMinLat .
?spatialRegionBox aoprop:boxMinLon ?spatialLocationBBMinLon .
?record aoprop:uri ?temporal .
?temporal aoprop:periodName ?temporalPeriodName .
?temporal aoprop:from ?temporalFrom .
?temporal aoprop:until ?temporalUntil .
?record aoprop:uri ?temporalNative .
?temporalNative aoprop:periodName ?temporalNativePeriodName .
?record aoprop:archeologicalResourceType ?archeologicalResourceType .
?archeologicalResourceType aoprop:name ?archeologicalResourceTypeName .
?record aoprop:resourceType ?resourceType .
?record aoprop:nativeSubject ?nativeSubject .
?nativeSubject aoprop:prefLabel ?nativeSubjectPrefLabel .
?nativeSubject aoprop:rdfAbout ?nativeSubject .
?record aoprop:derivedSubject ?derivedSubject .
?derivedSubject aoprop:prefLabel ?derivedSubjectPrefLabel .
?derivedSubject aoprop:source "Getty AAT" .
?record aoprop:aatSubjects ?derivedSubject .
?derivedSubject aoprop:id ?derivedSubject .
?derivedSubject aoprop:label ?derivedSubjectPrefLabel .
?record aoprop:title ?title .
?record aoprop:description ?description .
?record aoprop:language ?language .
}
from <https://ariadne-infrastructure.eu/api_________::ariadne_plus::ads::aat>
from <https://ariadne-infrastructure.eu/ariadneplus::ads::aatplus>
from <https://ariadne-infrastructure.eu/ariadneplus::ads::periodo>
from <https://ariadne-infrastructure.eu/ariadneplus::ads::periodoplus>
from <https://ariadne-infrastructure.eu/api_________::ariadne_plus::ads::271>
where {
?record aocat:has_language / skos:prefLabel ?language .
?record aocat:has_original_id ?originalId .
?record aocat:is_part_of ?partOf .
?record aocat:has_creator ?creator .
?creator aocat:has_name ?creatorName .
?record aocat:has_title ?title .
?record aocat:has_type / skos:prefLabel ?resourceType .
?record aocat:has_native_subject ?nativeSubject .
?nativeSubject skos:prefLabel ?nativeSubjectPrefLabel .
optional {
?record aocat:has_derived_subject ?derivedSubject .
?derivedSubject skos:prefLabel ?derivedSubjectPrefLabel .
}
optional {
?creator aocat:has_email ?creatorEmail .
}
optional {
?record aocat:has_description ?description .
}
optional {
?record aocat:has_access_policy / rdfs:label ?accessPolicy .
}
optional {
?record aocat:has_landing_page / rdfs:label ?landingPage .
}
optional {
?record aocat:has_temporal_coverage ?temporalNative .
?temporalNative aocat:has_native_period / skos:prefLabel ?temporalNativePeriodName .
}
optional {
?record aocat:has_temporal_coverage ?temporal .
?temporal aocat:has_period / skos:prefLabel ?temporalPeriodName .
optional {
?temporal aocat:from ?temporalFrom .
?temporal aocat:until ?temporalUntil .
}
}
{
select *
where {
?record aocat:is_part_of ?collection .
?collection aocat:was_issued ?issued .
?collection aocat:was_modified ?modified .
?collection aocat:has_contributor ?contributor .
?contributor aocat:has_name ?contributorName .
?collection aocat:has_responsible ?legalResponsible .
?legalResponsible aocat:has_name ?legalResponsibleName .
?collection aocat:has_owner ?owner .
?owner aocat:has_name ?ownerName .
?collection aocat:has_publisher ?publisher .
?publisher aocat:has_name ?publisherName .
?collection aocat:has_access_rights ?accessRights .
?collection aocat:has_ARIADNE_subject ?archeologicalResourceType .
?archeologicalResourceType skos:prefLabel ?archeologicalResourceTypeName .
optional {
?contributor aocat:has_email ?contributorEmail .
}
optional {
?legalResponsible aocat:has_email ?legalResponsibleEmail .
}
optional {
?owner aocat:has_email ?ownerEmail .
}
optional {
?publisher aocat:has_email ?publisherEmail .
}
}
}
{
select *
where {
?record aocat:has_spatial_coverage ?spatialRegion .
?spatialRegion aocat:has_place_name ?spatialPlaceName .
optional {
?spatialRegion aocat:has_coordinate_system ?spatialCoordinateSystem .
}
optional {
?record aocat:has_spatial_coverage ?spatialRegionPoint .
?spatialRegionPoint aocat:has_latitude ?spatialLocationLat ;
aocat:has_longitude ?spatialLocationLon .
}
optional {
?record aocat:has_spatial_coverage ?spatialRegionBox .
?spatialRegionBox aocat:has_bounding_box_max_lat ?spatialLocationBBMaxLat ;
aocat:has_bounding_box_max_lon ?spatialLocationBBMaxLon ;
aocat:has_bounding_box_min_lat ?spatialLocationBBMinLat ;
aocat:has_bounding_box_min_lon ?spatialLocationBBMinLon ;
}
}
}
}

View File

@ -26,7 +26,7 @@ public class GraphDbReaderAndESIndexTest {
private RunSPARQLQueryService runSPQRLQuery;
@Test
@Ignore
// @Ignore
public void readAndIndexTest() throws Exception {
final ClassPathResource resource = new ClassPathResource("application.properties");
Properties appProps = new Properties();
@ -36,7 +36,7 @@ public class GraphDbReaderAndESIndexTest {
runSPQRLQuery.setupConnection(
appProps.getProperty("graphdb.writer.user"),
appProps.getProperty("graphdb.writer.pwd"),
appProps.getProperty("repository.url"),
appProps.getProperty("graphdb.serverUrl"),
appProps.getProperty("graphdb.repository"));
ParseRDFJSON parseRDFJSON = new ParseRDFJSON();
parseRDFJSON.setCatalogEntryJsonPath(appProps.getProperty("catalog.entry.path"));
@ -48,12 +48,11 @@ public class GraphDbReaderAndESIndexTest {
appProps.getProperty("exclude.predicates"),
appProps.getProperty("class.map.specifications")
);
resourceManager.init();
runSPQRLQuery.setResourceManager(resourceManager);
BulkUpload bulkUpload = new BulkUpload();
bulkUpload.init();
bulkUpload.init(appProps.getProperty("elasticsearch.hostname"),appProps.getProperty("elasticsearch.indexname"));
runSPQRLQuery.setBulkUpload(bulkUpload);
String recordId = "<https://ariadne-infrastructure.eu/aocat/Resource/02E4F4B5-24B7-3AD7-B460-CFA8B1F0BD1F>";
String recordId = "<https://ariadne-infrastructure.eu/aocat/Resource/C6D951BA-069B-3E39-B93C-9BAE2C48B280>";
String datasource = "ads";
String collectionId = "271";
List<String> recordIds = Arrays.asList(recordId);
@ -69,14 +68,15 @@ public class GraphDbReaderAndESIndexTest {
final ClassPathResource resource = new ClassPathResource("application.properties");
Properties appProps = new Properties();
appProps.load(resource.getInputStream());
String datasource = "ads";
String collectionId = "271";
runSPQRLQuery = new RunSPARQLQueryService();
runSPQRLQuery.setupConnection(
appProps.getProperty("graphdb.writer.user"),
appProps.getProperty("graphdb.writer.pwd"),
appProps.getProperty("repository.url"),
appProps.getProperty("graphdb.repository"));
runSPQRLQuery.selectRecordIds();
runSPQRLQuery.selectRecordIds(datasource, collectionId);
}
@Test

View File

@ -0,0 +1,155 @@
package eu.dnetlib.ariadneplus.workflows.nodes;
import com.google.common.collect.Lists;
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
import eu.dnetlib.msro.workflows.graph.Arc;
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
import eu.dnetlib.msro.workflows.procs.Env;
import eu.dnetlib.msro.workflows.procs.Token;
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
import eu.dnetlib.rmi.common.ResultSet;
import eu.dnetlib.rmi.manager.MSROException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.net.ConnectException;
import java.util.List;
public class IndexOnESJobNode extends AsyncJobNode {
private static final Log log = LogFactory.getLog(IndexOnESJobNode.class);
private String eprParam;
@Autowired
private ResultSetClient resultSetClient;
private String publisherEndpoint;
private String datasourceInterface;
private String datasource;
//for parallel requests to the publisher endpoint
private int nThreads = 5;
@Override
protected String execute(final Env env) throws Exception {
int statusCode = -1;
String indexOnESResult = "noResult";
log.info("Publisher endpoint: " + getPublisherEndpoint());
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(nThreads);
CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build();
log.info("IndexOnES endpoint: " + getIndexOnESEndpoint());
CloseableHttpResponse responsePOST = null;
try {
HttpPost post = new HttpPost(getIndexOnESEndpoint());
List<NameValuePair> params = Lists.newArrayList();
String[] splits = getDatasourceInterface().split("::");
String datasource = splits[2];
String collectionId = splits[3];
params.add(new BasicNameValuePair("datasource", datasource));
params.add(new BasicNameValuePair("collectionId", collectionId));
UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8");
post.setEntity(ent);
log.info("Calling IndexOnES endpoint with params: "+getDatasource()+" "+getDatasourceInterface());
responsePOST = client.execute(post);
statusCode = responsePOST.getStatusLine().getStatusCode();
switch (statusCode) {
case 200:
log.info("index on ES completed");
break;
default:
log.error("error indexing on ES " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
break;
}
} catch (ConnectException ce) {
log.error(ce);
throw new MSROException("Unable to connect to Publisher endpoint" + getIndexOnESEndpoint());
}
catch (IOException e) {
log.error(e);
throw new MSROException("IO Error" + getIndexOnESEndpoint());
}
finally{
if(responsePOST != null) responsePOST.close();
client.close();
cm.shutdown();
}
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode));
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "enrichResult", indexOnESResult);
log.info(indexOnESResult);
if (statusCode!=200) {
throw new MSROException("Error from Publisher endpoint [ status code: " + statusCode + " ]");
}
return Arc.DEFAULT_ARC;
}
public String getPublisherEndpoint() {
return publisherEndpoint;
}
private String getIndexOnESEndpoint() {
return publisherEndpoint.concat("/indexOnES");
}
public void setPublisherEndpoint(final String publisherEndpoint) {
this.publisherEndpoint = publisherEndpoint;
}
public ResultSetClient getResultSetClient() {
return resultSetClient;
}
public void setResultSetClient(final ResultSetClient resultSetClient) {
this.resultSetClient = resultSetClient;
}
public String getEprParam() {
return eprParam;
}
public void setEprParam(String eprParam) {
this.eprParam = eprParam;
}
public String getDatasourceInterface() {
return datasourceInterface;
}
public void setDatasourceInterface(String datasourceInterface) {
this.datasourceInterface = datasourceInterface;
}
@Override
protected void beforeStart(Token token) {
token.setProgressProvider(new ResultsetProgressProvider(token.getEnv().getAttribute(getEprParam(), ResultSet.class), this.resultSetClient));
}
public String getDatasource() {
return datasource;
}
public void setDatasource(String datasource) {
this.datasource = datasource;
}
}

View File

@ -17,4 +17,6 @@
<bean id="wfNodeEnrichGraphDBContent" class="eu.dnetlib.ariadneplus.workflows.nodes.EnrichGraphDBContentJobNode" scope="prototype"/>
<bean id="wfNodeImportPeriodoIntoGraphDB" class="eu.dnetlib.ariadneplus.workflows.nodes.ImportPeriodoIntoGraphDBJobNode" scope="prototype"/>
<bean id="wfNodeIndexOnES" class="eu.dnetlib.ariadneplus.workflows.nodes.IndexOnESJobNode" scope="prototype"/>
</beans>

View File

@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value=""/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value=""/>
</HEADER>
<BODY>
<WORKFLOW_NAME>$name$</WORKFLOW_NAME>
<WORKFLOW_DESCRIPTION>$desc$</WORKFLOW_DESCRIPTION>
<WORKFLOW_INFO />
<WORKFLOW_FAMILY>aggregator</WORKFLOW_FAMILY>
<WORKFLOW_PRIORITY>$priority$</WORKFLOW_PRIORITY>
<DATASOURCE id="$dsId$" interface="$interface$" />
<CONFIGURATION status="WAIT_SYS_SETTINGS" start="MANUAL">
<PARAMETERS>
<PARAM name="publisherEndpoint" description="AriadnePlus Publisher Endpoint" required="true" managedBy="user" type="string">http://localhost:8080/ariadneplus/publish</PARAM>
</PARAMETERS>
<WORKFLOW>
<NODE name="IndexOnES" type="LaunchWorkflowTemplate" isStart="true">
<DESCRIPTION>Index on Elastic Search all records of a collection from GraphDB</DESCRIPTION>
<PARAMETERS>
<PARAM name="wfTemplateId" value="3819eb3d-fdea-4fc3-925f-9ce0f61be9ee_V29ya2Zsb3dUZW1wbGF0ZURTUmVzb3VyY2VzL1dvcmtmbG93VGVtcGxhdGVEU1Jlc291cmNlVHlwZQ=="/>
<PARAM name="wfTemplateParams">
<MAP>
<ENTRY key="dsId" value="$dsId$" />
<ENTRY key="dsName" value="$dsName$" />
<ENTRY key="interface" value="$interface$" />
<ENTRY key="publisherEndpoint" ref="publisherEndpoint" />
</MAP>
</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</WORKFLOW>
<DESTROY_WORKFLOW_TEMPLATE id="23ef4bb3-2383-45b4-9661-ab03472fcd52_V29ya2Zsb3dUZW1wbGF0ZURTUmVzb3VyY2VzL1dvcmtmbG93VGVtcGxhdGVEU1Jlc291cmNlVHlwZQ==">
<PARAMETERS/>
</DESTROY_WORKFLOW_TEMPLATE>
</CONFIGURATION>
<NOTIFICATIONS/>
<SCHEDULING enabled="false">
<CRON>9 9 9 ? * *</CRON>
<MININTERVAL>10080</MININTERVAL>
</SCHEDULING>
<STATUS/>
</BODY>
</RESOURCE_PROFILE>

View File

@ -0,0 +1,32 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="3819eb3d-fdea-4fc3-925f-9ce0f61be9ee_V29ya2Zsb3dUZW1wbGF0ZURTUmVzb3VyY2VzL1dvcmtmbG93VGVtcGxhdGVEU1Jlc291cmNlVHlwZQ=="/>
<RESOURCE_TYPE value="WorkflowTemplateDSResourceType"/>
<RESOURCE_KIND value="WorkflowTemplateDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-06-16T16:53:35+02:00"/>
</HEADER>
<BODY>
<CONFIGURATION>
<PARAMETERS>
<PARAM description="Datasource Name" name="dsName" required="true" type="string"/>
<PARAM description="Datasource Id" name="dsId" required="true" type="string"/>
<PARAM description="Datasource Interface" name="interface" required="true" type="string"/>
<PARAM description="AriadnePlus Publisher Endpoint" name="publisherEndpoint" required="true" type="string">http://localhost:8281/ariadneplus-graphdb</PARAM>
</PARAMETERS>
<WORKFLOW>
<NODE isStart="true" name="indexOnES" type="IndexOnES">
<DESCRIPTION>Index on Elastic Search all records of a collection from GraphDB</DESCRIPTION>
<PARAMETERS>
<PARAM name="publisherEndpoint" ref="publisherEndpoint"/>
<PARAM name="datasourceInterface" ref="interface"/>
<PARAM name="datasource" ref="dsName"/>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</WORKFLOW>
</CONFIGURATION>
</BODY>
</RESOURCE_PROFILE>

View File

@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="caf5903e-d040-4506-b00c-32db35d0cd59_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-06-16T12:00:00.0Z"/>
</HEADER>
<BODY>
<WORKFLOW_NAME>Index On ES</WORKFLOW_NAME>
<WORKFLOW_DESCRIPTION>Index on Elastic Search all records of a collection from GraphDB</WORKFLOW_DESCRIPTION>
<WORKFLOW_INFO>
<FIELD name="Action">IndexOnES</FIELD>
<FIELD name="Datasource class">Content Provider</FIELD>
</WORKFLOW_INFO>
<WORKFLOW_FAMILY>REPO_HI</WORKFLOW_FAMILY>
<WORKFLOW_PRIORITY>20</WORKFLOW_PRIORITY>
<CONFIGURATION status="EXECUTABLE" start="MANUAL">
<PARAMETERS/>
<WORKFLOW>
<NODE name="VerifyDatasource" type="VerifyDatasource" isStart="true">
<DESCRIPTION>Verify if DS is pending</DESCRIPTION>
<PARAMETERS>
<PARAM name="expectedInterfaceTypologyPrefixes" value=""/>
<PARAM name="expectedCompliancePrefixes" value="metadata,native"/>
</PARAMETERS>
<ARCS>
<ARC to="registerIndexOnESWf"/>
<ARC to="validateDs" name="validateDs"/>
</ARCS>
</NODE>
<NODE name="validateDs" type="ValidateDatasource">
<DESCRIPTION>Validate DS</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="registerIndexOnESWf"/>
</ARCS>
</NODE>
<NODE name="registerIndexOnESWf" type="RegisterWorkflowFromTemplate">
<DESCRIPTION>Create Workflow</DESCRIPTION>
<PARAMETERS>
<PARAM name="wfName" value="Index On ES"/>
<PARAM name="wfTemplate" value="/eu/dnetlib/ariadneplus/workflows/repo-hi/index_on_es_wf.xml.st"/>
<PARAM name="description" value="Index on Elastic Search all records of a collection from GraphDB"/>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</WORKFLOW>
</CONFIGURATION>
<NOTIFICATIONS/>
<SCHEDULING enabled="false">
<CRON>9 9 9 ? * *</CRON>
<MININTERVAL>10080</MININTERVAL>
</SCHEDULING>
<STATUS/>
</BODY>
</RESOURCE_PROFILE>