Compare commits

...

8 Commits

9 changed files with 34 additions and 7 deletions

View File

@ -1,15 +1,22 @@
package eu.dnetlib.dhp.utils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
import javax.xml.ws.BindingProvider;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class ISLookupClientFactory {
private static final Log log = LogFactory.getLog(ISLookupClientFactory.class);
private static final Logger log = LoggerFactory.getLogger(ISLookupClientFactory.class);
private static int requestTimeout = 60000 * 10;
private static int connectTimeout = 60000 * 10;
public static ISLookUpService getLookUpService(final String isLookupUrl) {
return getServiceStub(ISLookUpService.class, isLookupUrl);
@ -21,6 +28,25 @@ public class ISLookupClientFactory {
final JaxWsProxyFactoryBean jaxWsProxyFactory = new JaxWsProxyFactoryBean();
jaxWsProxyFactory.setServiceClass(clazz);
jaxWsProxyFactory.setAddress(endpoint);
return (T) jaxWsProxyFactory.create();
final T service = (T) jaxWsProxyFactory.create();
if (service instanceof BindingProvider) {
log
.info(
"setting timeouts for {} to requestTimeout: {}, connectTimeout: {}",
BindingProvider.class.getName(), requestTimeout, connectTimeout);
Map<String, Object> requestContext = ((BindingProvider) service).getRequestContext();
requestContext.put("com.sun.xml.internal.ws.request.timeout", requestTimeout);
requestContext.put("com.sun.xml.internal.ws.connect.timeout", connectTimeout);
requestContext.put("com.sun.xml.ws.request.timeout", requestTimeout);
requestContext.put("com.sun.xml.ws.connect.timeout", connectTimeout);
requestContext.put("javax.xml.ws.client.receiveTimeout", requestTimeout);
requestContext.put("javax.xml.ws.client.connectionTimeout", connectTimeout);
}
return service;
}
}

View File

@ -48,12 +48,13 @@ public class IndexOnESJob {
final JavaRDD<String> inputRdd = ClusterUtils
.readPath(spark, eventsPath, Event.class)
// .limit(10000) // TODO REMOVE
.map(IndexOnESJob::eventAsJsonString, Encoders.STRING())
.javaRDD();
final Map<String, String> esCfg = new HashMap<>();
// esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54");
esCfg.put("es.index.auto.create", "false");
esCfg.put("es.nodes", indexHost);
esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY
esCfg.put("es.batch.write.retry.count", "8");

View File

@ -7,7 +7,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-worfklow-profiles</artifactId>
<artifactId>dhp-workflow-profiles</artifactId>
<packaging>jar</packaging>
<!--

View File

@ -16,7 +16,7 @@
<description>This module is the container for the oozie workflow definitions in dnet-hadoop project</description>
<modules>
<module>dhp-worfklow-profiles</module>
<module>dhp-workflow-profiles</module>
<module>dhp-aggregation</module>
<module>dhp-distcp</module>
<module>dhp-actionmanager</module>