all records related to a datasource api are now published on graphdb into n named graphs because of out of memory issue
This commit is contained in:
parent
e748d3d802
commit
8530e488d0
|
@ -8,10 +8,15 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.eclipse.rdf4j.RDF4JException;
|
||||
import org.eclipse.rdf4j.model.IRI;
|
||||
import org.eclipse.rdf4j.model.Literal;
|
||||
import org.eclipse.rdf4j.model.Statement;
|
||||
import org.eclipse.rdf4j.model.ValueFactory;
|
||||
import org.eclipse.rdf4j.query.QueryLanguage;
|
||||
import org.eclipse.rdf4j.query.TupleQuery;
|
||||
import org.eclipse.rdf4j.query.TupleQueryResult;
|
||||
import org.eclipse.rdf4j.repository.Repository;
|
||||
import org.eclipse.rdf4j.repository.RepositoryConnection;
|
||||
import org.eclipse.rdf4j.repository.RepositoryResult;
|
||||
import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
|
||||
import org.eclipse.rdf4j.rio.RDFFormat;
|
||||
|
||||
|
@ -29,7 +34,8 @@ public class GraphDBClient {
|
|||
private static final Log log = LogFactory.getLog(GraphDBClient.class);
|
||||
|
||||
public static final String PROVENANCE_NS = "http://www.d-net.research-infrastructures.eu/provenance/";
|
||||
|
||||
public static final int NUM_RECORDS_THRESHOLD = 10;
|
||||
|
||||
private RecordParserHelper recordParserHelper;
|
||||
private String graphDBServerUrl;
|
||||
private String graphDBBaseURI;
|
||||
|
@ -58,27 +64,129 @@ public class GraphDBClient {
|
|||
RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl);
|
||||
manager.init();
|
||||
manager.setUsernameAndPassword(getWriterUser(), getWriterPwd());
|
||||
log.debug("manager init");
|
||||
Repository repository = manager.getRepository(getRepository());
|
||||
ValueFactory factory = repository.getValueFactory();
|
||||
String dsInterface = recordParserHelper.getDatasourceApi(record);
|
||||
IRI graph = factory.createIRI(dsInterface);
|
||||
String datasourceApi = recordParserHelper.getDatasourceApi(record);
|
||||
IRI graph = factory.createIRI(datasourceApi);
|
||||
IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi);
|
||||
log.debug("query current num partitions for graph " + graph);
|
||||
String graphName = graph.toString();
|
||||
boolean hasPartition = false;
|
||||
int numPartitions = 0;
|
||||
try (RepositoryConnection con = repository.getConnection()) {
|
||||
TupleQuery tupleQuery = con.prepareTupleQuery(QueryLanguage.SPARQL, "select ?num_partitions\n" +
|
||||
"where {\n" +
|
||||
" graph <https://ariadne-infrastructure.eu/datasourceApis> {\n" +
|
||||
" <" + rApi + "> <http://www.d-net.research-infrastructures.eu/provenance/num_partitions> ?num_partitions\n" +
|
||||
" }\n" +
|
||||
"}");
|
||||
TupleQueryResult tupleQueryResult = tupleQuery.evaluate();
|
||||
if (tupleQueryResult.hasNext()) {
|
||||
hasPartition = true;
|
||||
numPartitions = Integer.parseInt(tupleQueryResult.next().getValue("num_partitions").stringValue());
|
||||
log.debug("numPartitions: "+ numPartitions);
|
||||
}
|
||||
con.close();
|
||||
}
|
||||
catch (RDF4JException e) {
|
||||
log.error("error counting partitions ...", e);
|
||||
}
|
||||
|
||||
if (!hasPartition) {
|
||||
graphName = graph.toString().concat("_partition_").concat("1");
|
||||
}
|
||||
else {
|
||||
if (numPartitions==0) {
|
||||
log.debug("partition not already created, default numPartitions set to 1");
|
||||
numPartitions+=1;
|
||||
}
|
||||
graphName = graph.toString().concat("_partition_").concat(Integer.toString(numPartitions));
|
||||
}
|
||||
|
||||
log.debug("query current records count on graph " + graphName);
|
||||
int currentRecordsCount = 0;
|
||||
try (RepositoryConnection con = repository.getConnection()) {
|
||||
TupleQuery tupleQuery = con.prepareTupleQuery(QueryLanguage.SPARQL, "select (COUNT(?has_identifier)/2 AS ?count)\n" +
|
||||
"where {\n" +
|
||||
" graph <" + graphName + "> {\n" +
|
||||
" ?x <https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/has_identifier> ?has_identifier\n" +
|
||||
" }\n" +
|
||||
"}");
|
||||
TupleQueryResult tupleQueryResult = tupleQuery.evaluate();
|
||||
if (tupleQueryResult.hasNext()) {
|
||||
currentRecordsCount = Integer.parseInt(tupleQueryResult.next().getValue("count").stringValue());
|
||||
log.debug("currentRecordsCount: "+ currentRecordsCount);
|
||||
}
|
||||
con.close();
|
||||
}
|
||||
catch (RDF4JException e) {
|
||||
log.error("error counting records ...", e);
|
||||
}
|
||||
|
||||
int origNumPartitions = numPartitions;
|
||||
boolean numRecordsThresholdReached = false;
|
||||
if (currentRecordsCount >= NUM_RECORDS_THRESHOLD) {
|
||||
numRecordsThresholdReached = true;
|
||||
numPartitions+=1;
|
||||
graphName = graph.toString().concat("_partition_").concat(Integer.toString(numPartitions));
|
||||
log.debug("threshold reached graphname is: " + graphName);
|
||||
}
|
||||
else {
|
||||
log.debug("threshold not reached graphname is: " + graphName);
|
||||
}
|
||||
|
||||
try (RepositoryConnection con = repository.getConnection()) {
|
||||
log.debug("connection established");
|
||||
con.begin();
|
||||
String recordURI = getRecordURI(objIdentifier, dsInterface);
|
||||
log.debug("Trying to adding record with recordURI " + recordURI + " into graph " + graph);
|
||||
con.add(IOUtils.toInputStream(getRDFBlock(record), "UTF-8"), recordURI, RDFFormat.RDFXML, graph);
|
||||
String recordURI = getRecordURI(objIdentifier, datasourceApi);
|
||||
log.debug("Adding record " + recordURI + " into graph " + graphName);
|
||||
con.add(IOUtils.toInputStream(getRDFBlock(record), "UTF-8"), recordURI, RDFFormat.RDFXML, factory.createIRI(graphName));
|
||||
con.commit();
|
||||
log.debug("statement added");
|
||||
con.close();
|
||||
}
|
||||
catch (RDF4JException e) {
|
||||
log.error("error adding statement ...", e);
|
||||
}
|
||||
|
||||
if (numRecordsThresholdReached) {
|
||||
log.debug("updating current numPartitionsStmt");
|
||||
Statement numPartitionsStmt = null;
|
||||
try (RepositoryConnection con = repository.getConnection()) {
|
||||
IRI pred = factory.createIRI("http://www.d-net.research-infrastructures.eu/provenance/num_partitions");
|
||||
Literal curValue = factory.createLiteral(origNumPartitions);
|
||||
IRI datasourceApisGraph = factory.createIRI("https://ariadne-infrastructure.eu/datasourceApis");
|
||||
RepositoryResult<Statement> numPartitionsStmts = con.getStatements(rApi, pred, curValue, false, datasourceApisGraph);
|
||||
if (numPartitionsStmts.hasNext()) {
|
||||
con.begin();
|
||||
numPartitionsStmt = numPartitionsStmts.next();
|
||||
log.debug("current numPartitionsStmt retrieved " + numPartitionsStmt.toString() + " inside " + datasourceApisGraph.toString());
|
||||
con.remove(numPartitionsStmt, datasourceApisGraph);
|
||||
log.debug("current numPartitionsStmt removed");
|
||||
Statement numPartitionStmtUpdated = factory.createStatement(rApi, pred, factory.createLiteral(numPartitions));
|
||||
con.add(numPartitionStmtUpdated, datasourceApisGraph);
|
||||
log.debug("numPartitionsStmt updated");
|
||||
con.commit();
|
||||
con.close();
|
||||
}
|
||||
else {
|
||||
con.begin();
|
||||
Statement numPartitionStmtUpdated = factory.createStatement(rApi, pred, factory.createLiteral(numPartitions++));
|
||||
con.add(numPartitionStmtUpdated, datasourceApisGraph);
|
||||
log.debug("numPartitionsStmt updated");
|
||||
con.commit();
|
||||
con.close();
|
||||
}
|
||||
if (con.isActive()) {
|
||||
con.close();
|
||||
}
|
||||
}
|
||||
catch (RDF4JException e) {
|
||||
log.error("error adding statement ...", e);
|
||||
log.error("error updating num partition statement ", e);
|
||||
}
|
||||
}
|
||||
|
||||
repository.shutDown();
|
||||
manager.shutDown();
|
||||
log.debug("manager shutDown");
|
||||
return 1;
|
||||
}catch(Throwable e){
|
||||
log.error(e);
|
||||
|
@ -105,7 +213,6 @@ public class GraphDBClient {
|
|||
Statement stmInsertedDate = factory.createStatement(rApi, INSERTED_IN_DATE, factory.createLiteral(now.toString()));
|
||||
IRI datasourceApisGraph = factory.createIRI(getGraphDBBaseURI(), "datasourceApis");
|
||||
try (RepositoryConnection con = repository.getConnection()) {
|
||||
log.debug("connection established");
|
||||
con.begin();
|
||||
log.debug("Adding stmt " + stmApi.toString() + " into graph " + datasourceApisGraph.toString());
|
||||
con.add(stmApi, datasourceApisGraph);
|
||||
|
@ -121,7 +228,6 @@ public class GraphDBClient {
|
|||
}
|
||||
repository.shutDown();
|
||||
manager.shutDown();
|
||||
log.debug("manager shutDown");
|
||||
return 200;
|
||||
}
|
||||
catch(Throwable e){
|
||||
|
@ -130,39 +236,44 @@ public class GraphDBClient {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all triples in named graphs collected from the given api
|
||||
* @param api the id of the API
|
||||
* @return the number of triples deleted from the named graphs associated to the given api
|
||||
*/
|
||||
public long drop(final String api){
|
||||
// Model prov = null;
|
||||
// //look for all named graphs associated to the api
|
||||
// Resource rApi = ResourceFactory.createResource(defaultBaseURI + api);
|
||||
// long deletedTriples = 0;
|
||||
// final ResIterator resIterator = prov.listSubjectsWithProperty(COLL_FROM, rApi);
|
||||
// while (resIterator.hasNext()) {
|
||||
// Resource namedGraphURI = resIterator.nextResource();
|
||||
// //delete all triples belonging to the r named graph
|
||||
// deletedTriples += dropNamedGraph(namedGraphURI.getURI());
|
||||
// //delete the named graph from the provenance graph
|
||||
// prov.removeAll(namedGraphURI, null, null);
|
||||
// }
|
||||
// //delete the api from the provenance graph
|
||||
// prov.removeAll(null, null, rApi);
|
||||
// prov.removeAll(rApi, null, null);
|
||||
// prov.close();
|
||||
// return deletedTriples;
|
||||
return 0;
|
||||
}
|
||||
|
||||
private long dropNamedGraph(String namedGraphURI){
|
||||
// Model namedGraph = null;
|
||||
// long deletedTriples = namedGraph.size();
|
||||
// namedGraph.removeAll();
|
||||
// namedGraph.close();
|
||||
// return deletedTriples;
|
||||
return 0;
|
||||
public long dropDatasourceApisPartitionInfo(final String datasourceApi) throws AriadnePlusPublisherException {
|
||||
|
||||
try {
|
||||
log.debug("init connection to graphDBServerUrl " + this.graphDBServerUrl);
|
||||
RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl);
|
||||
manager.init();
|
||||
manager.setUsernameAndPassword(getWriterUser(), getWriterPwd());
|
||||
Repository repository = manager.getRepository(getRepository());
|
||||
ValueFactory factory = repository.getValueFactory();
|
||||
IRI HAS_NUM_PARTITIONS = factory.createIRI("http://www.d-net.research-infrastructures.eu/provenance/num_partitions");
|
||||
IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi);
|
||||
IRI datasourceApisGraph = factory.createIRI(getGraphDBBaseURI(), "datasourceApis");
|
||||
Statement numPartitionsStmt = null;
|
||||
try (RepositoryConnection con = repository.getConnection()) {
|
||||
log.debug("Removing datasourceApi partition info s:" + rApi.toString() + " p:" + HAS_NUM_PARTITIONS + " g:" + datasourceApisGraph );
|
||||
RepositoryResult<Statement> numPartitionsStmts = con.getStatements(rApi, HAS_NUM_PARTITIONS, null, false, datasourceApisGraph);
|
||||
if (numPartitionsStmts.hasNext()) {
|
||||
con.begin();
|
||||
numPartitionsStmt = numPartitionsStmts.next();
|
||||
int numPartitions = Integer.parseInt(numPartitionsStmt.getObject().stringValue());
|
||||
log.debug(" old partitions count: " + numPartitions);
|
||||
con.remove(rApi, HAS_NUM_PARTITIONS, factory.createLiteral(numPartitions), datasourceApisGraph);
|
||||
con.commit();
|
||||
con.close();
|
||||
}
|
||||
}
|
||||
catch (RDF4JException e) {
|
||||
log.error("error removing datasourceApi partition info ", e);
|
||||
throw new AriadnePlusPublisherException(e);
|
||||
}
|
||||
repository.shutDown();
|
||||
manager.shutDown();
|
||||
return 200;
|
||||
}
|
||||
catch(Throwable e){
|
||||
log.error("error removing datasourceApi partition info ", e);
|
||||
throw new AriadnePlusPublisherException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getRecordURI(final String objIdentifier, final String datasourceApi) {
|
||||
|
@ -193,7 +304,6 @@ public class GraphDBClient {
|
|||
log.warn("Got record with no objIdentifier -- skipping");
|
||||
return "";
|
||||
}
|
||||
log.debug(objIdentifier);
|
||||
String rdfBlock = recordParserHelper.getRDF(record);
|
||||
if (StringUtils.isBlank(rdfBlock)) {
|
||||
log.warn("Missing rdf:RDF in record with objIdentifier " + objIdentifier);
|
||||
|
|
|
@ -46,6 +46,13 @@ public class AriadnePlusPublisherController {
|
|||
getAriadnePlusPublisherHelper().feedProvenance(datasourceApi, getTarget(ariadneplusTarget));
|
||||
}
|
||||
|
||||
@RequestMapping(value = "/dropDatasourceApisPartitionInfo", method = RequestMethod.POST)
|
||||
public void dropDatasourceApisPartitionInfo(@RequestParam final String datasourceApi, @RequestParam(required = false) String ariadneplusTarget) throws AriadnePlusPublisherException {
|
||||
if (ariadneplusTarget==null) {
|
||||
ariadneplusTarget = DEFAULT_TARGET_ENDPOINT;
|
||||
}
|
||||
getAriadnePlusPublisherHelper().dropDatasourceApisPartitionInfo(datasourceApi, getTarget(ariadneplusTarget));
|
||||
}
|
||||
|
||||
@RequestMapping(value = "/unpublish", method = RequestMethod.GET)
|
||||
public void unpublish(@RequestParam final String datasourceApi, @RequestParam(required = false) String ariadneplusTarget) throws AriadnePlusPublisherException {
|
||||
|
|
|
@ -45,6 +45,16 @@ public class AriadnePlusPublisherHelper {
|
|||
|
||||
}
|
||||
|
||||
public void dropDatasourceApisPartitionInfo(final String datasourceApi, final AriadnePlusTargets target) throws AriadnePlusPublisherException {
|
||||
switch(target){
|
||||
case GRAPHDB:
|
||||
dropDatasourceApisPartitionInfo(datasourceApi);
|
||||
break;
|
||||
default: throw new AriadnePlusPublisherException("Target "+target+" not supported yet");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public long unpublish(final String datasourceInterface, final AriadnePlusTargets target) throws AriadnePlusPublisherException {
|
||||
long res = 0;
|
||||
switch(target){
|
||||
|
@ -63,11 +73,17 @@ public class AriadnePlusPublisherHelper {
|
|||
}
|
||||
|
||||
private void feedProvenance(final String datasourceApi) throws AriadnePlusPublisherException {
|
||||
log.debug("Feed Provenance on graphdb");
|
||||
log.debug("Feed Provenance " + datasourceApi);
|
||||
GraphDBClient graphDBClient = this.graphdbClientFactory.getGraphDBClient();
|
||||
graphDBClient.feedProvenance(datasourceApi);
|
||||
}
|
||||
|
||||
private void dropDatasourceApisPartitionInfo(final String datasourceApi) throws AriadnePlusPublisherException {
|
||||
log.debug("Drop DatasourceApis Partition Info " + datasourceApi);
|
||||
GraphDBClient graphDBClient = this.graphdbClientFactory.getGraphDBClient();
|
||||
graphDBClient.dropDatasourceApisPartitionInfo(datasourceApi);
|
||||
}
|
||||
|
||||
private long unpublishGraphDB(final String datasourceInterface) {
|
||||
log.info("Unpublishing from graphdb "+datasourceInterface);
|
||||
GraphDBClient graphDBClient = this.graphdbClientFactory.getGraphDBClient();
|
||||
|
|
|
@ -66,8 +66,38 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
|
|||
log.info("Publisher endpoint: " + getPublisherEndpoint());
|
||||
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
|
||||
cm.setMaxTotal(nThreads);
|
||||
|
||||
CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build();
|
||||
|
||||
log.info("DropDatasourceApisPartitionInfo endpoint: " + getDropDatasourceApisPartitionInfoEndpoint());
|
||||
CloseableHttpResponse responseDDAPIPOST = null;
|
||||
try {
|
||||
HttpPost post = new HttpPost(getDropDatasourceApisPartitionInfoEndpoint());
|
||||
List<NameValuePair> params = Lists.newArrayList();
|
||||
String datasourceInterfaceValue = getDatasourceInterface();
|
||||
log.info("drop datasourceApis partition info for datasourceInterface " + datasourceInterfaceValue);
|
||||
params.add(new BasicNameValuePair("datasourceApi", datasourceInterfaceValue));
|
||||
UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8");
|
||||
post.setEntity(ent);
|
||||
responseDDAPIPOST = client.execute(post);
|
||||
int statusCode = responseDDAPIPOST.getStatusLine().getStatusCode();
|
||||
switch (statusCode) {
|
||||
case 200:
|
||||
log.info("drop datasourceApis partition info completed");
|
||||
break;
|
||||
default:
|
||||
log.error("error dropping datasourceApis partition info " + responseDDAPIPOST.getStatusLine().getStatusCode() + ": " + responseDDAPIPOST.getStatusLine().getReasonPhrase());
|
||||
break;
|
||||
}
|
||||
} catch (ConnectException ce) {
|
||||
throw new MSROException("unable to connect to Publisher endpoint" + getPublishEndpoint());
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error("error feeding provenance ", e);
|
||||
}
|
||||
finally{
|
||||
if(responseDDAPIPOST != null) responseDDAPIPOST.close();
|
||||
}
|
||||
|
||||
//need to slow down the producer to avoid OOM errors due to many tasks in the queue of the executor
|
||||
//see for example here: https://stackoverflow.com/questions/42108351/executorservice-giving-out-of-memory-error
|
||||
//let's stop and wait after submission of nLatch tasks
|
||||
|
@ -86,20 +116,20 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
|
|||
}
|
||||
partial++;
|
||||
Future<Integer> res = executorService.submit( () -> {
|
||||
CloseableHttpResponse responsePOST = null;
|
||||
CloseableHttpResponse responsePPOST = null;
|
||||
try {
|
||||
HttpPost post = new HttpPost(getPublishEndpoint());
|
||||
List<NameValuePair> params = Lists.newArrayList();
|
||||
params.add(new BasicNameValuePair("record", record));
|
||||
UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8");
|
||||
post.setEntity(ent);
|
||||
responsePOST = client.execute(post);
|
||||
int statusCode = responsePOST.getStatusLine().getStatusCode();
|
||||
responsePPOST = client.execute(post);
|
||||
int statusCode = responsePPOST.getStatusLine().getStatusCode();
|
||||
switch (statusCode) {
|
||||
case 200:
|
||||
return statusCode;
|
||||
default:
|
||||
log.error(responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
|
||||
log.error(responsePPOST.getStatusLine().getStatusCode() + ": " + responsePPOST.getStatusLine().getReasonPhrase());
|
||||
log.error("Source record causing error: " + record);
|
||||
errors.merge(statusCode, 1, Integer::sum);
|
||||
return statusCode;
|
||||
|
@ -112,7 +142,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
|
|||
errors.merge(-1, 1, Integer::sum);
|
||||
}
|
||||
finally{
|
||||
if(responsePOST != null) responsePOST.close();
|
||||
if(responsePPOST != null) responsePPOST.close();
|
||||
}
|
||||
return -1;
|
||||
});
|
||||
|
@ -141,7 +171,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
|
|||
|
||||
if (countOk > 0) {
|
||||
log.info("Feed provenance endpoint: " + getProvenanceFeedEndpoint());
|
||||
CloseableHttpResponse responsePOST = null;
|
||||
CloseableHttpResponse responsePFPOST = null;
|
||||
try {
|
||||
HttpPost post = new HttpPost(getProvenanceFeedEndpoint());
|
||||
List<NameValuePair> params = Lists.newArrayList();
|
||||
|
@ -150,14 +180,14 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
|
|||
params.add(new BasicNameValuePair("datasourceApi", datasourceInterfaceValue));
|
||||
UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8");
|
||||
post.setEntity(ent);
|
||||
responsePOST = client.execute(post);
|
||||
int statusCode = responsePOST.getStatusLine().getStatusCode();
|
||||
responsePFPOST = client.execute(post);
|
||||
int statusCode = responsePFPOST.getStatusLine().getStatusCode();
|
||||
switch (statusCode) {
|
||||
case 200:
|
||||
log.info("feed provenance completed");
|
||||
break;
|
||||
default:
|
||||
log.error("error feeding provenance " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
|
||||
log.error("error feeding provenance " + responsePFPOST.getStatusLine().getStatusCode() + ": " + responsePFPOST.getStatusLine().getReasonPhrase());
|
||||
break;
|
||||
}
|
||||
} catch (ConnectException ce) {
|
||||
|
@ -167,7 +197,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
|
|||
log.error("error feeding provenance ", e);
|
||||
}
|
||||
finally{
|
||||
if(responsePOST != null) responsePOST.close();
|
||||
if(responsePFPOST != null) responsePFPOST.close();
|
||||
client.close();
|
||||
cm.shutdown();
|
||||
}
|
||||
|
@ -188,6 +218,10 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
|
|||
return publisherEndpoint.concat("/feedProvenance");
|
||||
}
|
||||
|
||||
private String getDropDatasourceApisPartitionInfoEndpoint() {
|
||||
return publisherEndpoint.concat("/dropDatasourceApisPartitionInfo");
|
||||
}
|
||||
|
||||
public void setPublisherEndpoint(final String publisherEndpoint) {
|
||||
this.publisherEndpoint = publisherEndpoint;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue