mergin with branch beta

This commit is contained in:
Miriam Baglioni 2024-01-30 12:19:10 +01:00
commit ead08b0dd4
13 changed files with 586 additions and 430 deletions

43
CODE_OF_CONDUCT.md Normal file
View File

@ -0,0 +1,43 @@
# Contributor Code of Conduct
Openness, transparency and our community-driven participatory approach guide us in our day-to-day interactions and decision-making. Our open source projects are no exception. Trust, respect, collaboration and transparency are core values we believe should live and breathe within our projects. Our community welcomes participants from around the world with different experiences, unique perspectives, and great ideas to share.
## Our Pledge
In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, sex characteristics, gender identity and expression, level of experience, education, socio-economic status, nationality, personal appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment include:
- Using welcoming and inclusive language
- Being respectful of differing viewpoints and experiences
- Gracefully accepting constructive criticism
- Attempting collaboration before conflict
- Focusing on what is best for the community
- Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
- Violence, threats of violence, or inciting others to commit self-harm
- The use of sexualized language or imagery and unwelcome sexual attention or advances
- Trolling, intentionally spreading misinformation, insulting/derogatory comments, and personal or political attacks
- Public or private harassment
- Publishing others' private information, such as a physical or electronic address, without explicit permission
- Abuse of the reporting process to intentionally harass or exclude others
- Advocating for, or encouraging, any of the above behavior
- Other conduct which could reasonably be considered inappropriate in a professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant](https://www.contributor-covenant.org/), [version 1.4](https://www.contributor-covenant.org/version/1/4/code-of-conduct.html).

10
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,10 @@
# Contributing to D-Net Hadoop
:+1::tada: First off, thanks for taking the time to contribute! :tada::+1:
This project and everyone participating in it is governed by our [Code of Conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to [dnet-team@isti.cnr.it](mailto:dnet-team@isti.cnr.it).
The following is a set of guidelines for contributing to this project and its packages. These are mostly guidelines, not rules, which applies to this project as a while, including all its sub-modules.
Use your best judgment, and feel free to propose changes to this document in a pull request.
All contributions are welcome, all contributions will be considered to be contributed under the [project license](LICENSE.md).

View File

View File

@ -2,6 +2,11 @@
Dnet-hadoop is the project that defined all the [OOZIE workflows](https://oozie.apache.org/) for the OpenAIRE Graph construction, processing, provisioning.
This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDUCT.md).
By participating, you are expected to uphold this code. Please report unacceptable behavior to [dnet-team@isti.cnr.it](mailto:dnet-team@isti.cnr.it).
This project is licensed under the [AGPL v3 or later version](#LICENSE.md).
How to build, package and run oozie workflows
====================

View File

@ -8,10 +8,13 @@ import java.io.InputStream;
import java.net.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.http.HttpHeaders;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -94,14 +97,16 @@ public class HttpConnector2 {
throw new CollectorException(msg);
}
log.info("Request attempt {} [{}]", retryNumber, requestUrl);
InputStream input = null;
long start = System.currentTimeMillis();
try {
if (getClientParams().getRequestDelay() > 0) {
backoffAndSleep(getClientParams().getRequestDelay());
}
log.info("Request attempt {} [{}]", retryNumber, requestUrl);
final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection();
urlConn.setInstanceFollowRedirects(false);
urlConn.setReadTimeout(getClientParams().getReadTimeOut() * 1000);
@ -115,9 +120,8 @@ public class HttpConnector2 {
urlConn.addRequestProperty(headerEntry.getKey(), headerEntry.getValue());
}
}
if (log.isDebugEnabled()) {
logHeaderFields(urlConn);
}
int retryAfter = obtainRetryAfter(urlConn.getHeaderFields());
String rateLimit = urlConn.getHeaderField(Constants.HTTPHEADER_IETF_DRAFT_RATELIMIT_LIMIT);
@ -132,9 +136,7 @@ public class HttpConnector2 {
}
if (is2xx(urlConn.getResponseCode())) {
input = urlConn.getInputStream();
responseType = urlConn.getContentType();
return input;
return getInputStream(urlConn, start);
}
if (is3xx(urlConn.getResponseCode())) {
// REDIRECTS
@ -144,6 +146,7 @@ public class HttpConnector2 {
.put(
REPORT_PREFIX + urlConn.getResponseCode(),
String.format("Moved to: %s", newUrl));
logRequestTime(start);
urlConn.disconnect();
if (retryAfter > 0) {
backoffAndSleep(retryAfter);
@ -159,26 +162,50 @@ public class HttpConnector2 {
if (retryAfter > 0) {
log
.warn(
"{} - waiting and repeating request after suggested retry-after {} sec.",
requestUrl, retryAfter);
"waiting and repeating request after suggested retry-after {} sec for URL {}",
retryAfter, requestUrl);
backoffAndSleep(retryAfter * 1000);
} else {
log
.warn(
"{} - waiting and repeating request after default delay of {} sec.",
requestUrl, getClientParams().getRetryDelay());
backoffAndSleep(retryNumber * getClientParams().getRetryDelay() * 1000);
"waiting and repeating request after default delay of {} sec for URL {}",
getClientParams().getRetryDelay(), requestUrl);
backoffAndSleep(retryNumber * getClientParams().getRetryDelay());
}
report.put(REPORT_PREFIX + urlConn.getResponseCode(), requestUrl);
logRequestTime(start);
urlConn.disconnect();
return attemptDownload(requestUrl, retryNumber + 1, report);
case 422: // UNPROCESSABLE ENTITY
report.put(REPORT_PREFIX + urlConn.getResponseCode(), requestUrl);
log.warn("waiting and repeating request after 10 sec for URL {}", requestUrl);
backoffAndSleep(10000);
urlConn.disconnect();
logRequestTime(start);
try {
return getInputStream(urlConn, start);
} catch (IOException e) {
log
.error(
"server returned 422 and got IOException accessing the response body from URL {}",
requestUrl);
log.error("IOException:", e);
return attemptDownload(requestUrl, retryNumber + 1, report);
}
default:
log.error("gor error {} from URL: {}", urlConn.getResponseCode(), urlConn.getURL());
log.error("response message: {}", urlConn.getResponseMessage());
report
.put(
REPORT_PREFIX + urlConn.getResponseCode(),
String
.format(
"%s Error: %s", requestUrl, urlConn.getResponseMessage()));
logRequestTime(start);
urlConn.disconnect();
throw new CollectorException(urlConn.getResponseCode() + " error " + report);
}
}
@ -199,13 +226,27 @@ public class HttpConnector2 {
}
}
private InputStream getInputStream(HttpURLConnection urlConn, long start) throws IOException {
InputStream input = urlConn.getInputStream();
responseType = urlConn.getContentType();
logRequestTime(start);
return input;
}
private static void logRequestTime(long start) {
log
.info(
"request time elapsed: {}sec",
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - start));
}
private void logHeaderFields(final HttpURLConnection urlConn) throws IOException {
log.debug("StatusCode: {}", urlConn.getResponseMessage());
log.info("Response: {} - {}", urlConn.getResponseCode(), urlConn.getResponseMessage());
for (Map.Entry<String, List<String>> e : urlConn.getHeaderFields().entrySet()) {
if (e.getKey() != null) {
for (String v : e.getValue()) {
log.debug(" key: {} - value: {}", e.getKey(), v);
log.info(" key: {} - value: {}", e.getKey(), v);
}
}
}
@ -225,7 +266,7 @@ public class HttpConnector2 {
for (String key : headerMap.keySet()) {
if ((key != null) && key.equalsIgnoreCase(HttpHeaders.RETRY_AFTER) && (!headerMap.get(key).isEmpty())
&& NumberUtils.isCreatable(headerMap.get(key).get(0))) {
return Integer.parseInt(headerMap.get(key).get(0)) + 10;
return Integer.parseInt(headerMap.get(key).get(0));
}
}
return -1;

View File

@ -25,7 +25,7 @@ case class mappingAffiliation(name: String) {}
case class mappingAuthor(
given: Option[String],
family: String,
family: Option[String],
sequence: Option[String],
ORCID: Option[String],
affiliation: Option[mappingAffiliation]
@ -226,14 +226,14 @@ case object Crossref2Oaf {
//Mapping Author
val authorList: List[mappingAuthor] =
(json \ "author").extractOrElse[List[mappingAuthor]](List())
(json \ "author").extract[List[mappingAuthor]].filter(a => a.family.isDefined)
val sorted_list = authorList.sortWith((a: mappingAuthor, b: mappingAuthor) =>
a.sequence.isDefined && a.sequence.get.equalsIgnoreCase("first")
)
result.setAuthor(sorted_list.zipWithIndex.map { case (a, index) =>
generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull, index)
generateAuhtor(a.given.orNull, a.family.get, a.ORCID.orNull, index)
}.asJava)
// Mapping instance

File diff suppressed because one or more lines are too long

View File

@ -22,6 +22,13 @@ class CrossrefMappingTest {
val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass)
val mapper = new ObjectMapper()
@Test
def testMissingAuthorParser():Unit = {
val json: String = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/s41567-022-01757-y.json")).mkString
val result = Crossref2Oaf.convert(json)
result.filter(o => o.isInstanceOf[Publication]).map(p=> p.asInstanceOf[Publication]).foreach(p =>assertTrue(p.getAuthor.size()>0))
}
@Test
def testFunderRelationshipsMapping(): Unit = {
val template = Source

View File

@ -114,7 +114,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/preparedInfo/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark>
<ok to="wait"/>
@ -142,7 +142,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/preparedInfo/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark>
<ok to="wait"/>
@ -170,7 +170,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/preparedInfo/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark>
<ok to="wait"/>
@ -198,7 +198,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/preparedInfo/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark>
<ok to="wait"/>
@ -225,8 +225,8 @@
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/orcid/orcidprop</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/orcidprop/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
</spark>
<ok to="fork-join-exec-propagation"/>
<error to="Kill"/>
@ -247,9 +247,10 @@
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--executor-cores=4
--executor-memory=4G
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=5G
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -259,9 +260,9 @@
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
--conf spark.sql.shuffle.partitions=15000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/orcidprop/mergedOrcidAssoc</arg>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
@ -291,7 +292,7 @@
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/orcidprop/mergedOrcidAssoc</arg>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
@ -321,7 +322,7 @@
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/orcidprop/mergedOrcidAssoc</arg>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
@ -351,7 +352,7 @@
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/orcidprop/mergedOrcidAssoc</arg>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>

View File

@ -31,16 +31,19 @@ public class ContextMapper extends HashMap<String, ContextDef> implements Serial
final ContextMapper contextMapper = new ContextMapper();
for (ContextSummary ctx : DNetRestClient.doGET(baseURL + "/contexts", ContextSummaryList.class)) {
for (ContextSummary ctx : DNetRestClient
.doGET(String.format("%s/contexts", baseURL), ContextSummaryList.class)) {
contextMapper.put(ctx.getId(), new ContextDef(ctx.getId(), ctx.getLabel(), "context", ctx.getType()));
for (CategorySummary cat : DNetRestClient
.doGET(baseURL + "/context/" + ctx.getId(), CategorySummaryList.class)) {
.doGET(String.format("%s/context/%s?all=true", baseURL, ctx.getId()), CategorySummaryList.class)) {
contextMapper.put(cat.getId(), new ContextDef(cat.getId(), cat.getLabel(), "category", ""));
if (cat.isHasConcept()) {
for (ConceptSummary c : DNetRestClient
.doGET(baseURL + "/context/category/" + cat.getId(), ConceptSummaryList.class)) {
.doGET(
String.format("%s/context/category/%s?all=true", baseURL, cat.getId()),
ConceptSummaryList.class)) {
contextMapper.put(c.getId(), new ContextDef(c.getId(), c.getLabel(), "concept", ""));
if (c.isHasSubConcept()) {
for (ConceptSummary cs : c.getConcepts()) {

View File

@ -8,6 +8,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-stats-update</artifactId>
<dependencies>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>

View File

@ -64,6 +64,26 @@
<name>hadoop_user_name</name>
<description>user name of the wf owner</description>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=6G --conf spark.executor.memoryOverhead=4G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=3840</value>
<description>spark resource options</description>
</property>
</parameters>
<global>
@ -78,6 +98,14 @@
<name>hive.txn.timeout</name>
<value>${hive_timeout}</value>
</property>
<property>
<name>hive.mapjoin.followby.gby.localtask.max.memory.usage</name>
<value>0.80</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>analytics</value>
@ -318,12 +346,23 @@
</action>
<action name="Step16-createIndicatorsTables">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step16-createIndicatorsTables.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>external_stats_db_name=${external_stats_db_name}</param>
</hive2>
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Step16-createIndicatorsTables</name>
<class>eu.dnetlib.dhp.oozie.RunSQLSparkJob</class>
<jar>dhp-stats-update-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--hiveMetastoreUris</arg><arg>${hive_metastore_uris}</arg>
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16-createIndicatorsTables.sql</arg>
<arg>--stats_db_name</arg><arg>${stats_db_name}</arg>
<arg>--external_stats_db_name</arg><arg>${external_stats_db_name}</arg>
</spark>
<ok to="Step16_1-definitions"/>
<error to="Kill"/>
</action>
@ -383,18 +422,18 @@
<error to="Kill"/>
</action>
<!-- <action name="step20-createMonitorDB-post">-->
<!-- <shell xmlns="uri:oozie:shell-action:0.1">-->
<!-- <job-tracker>${jobTracker}</job-tracker>-->
<!-- <name-node>${nameNode}</name-node>-->
<!-- <exec>monitor-post.sh</exec>-->
<!-- <argument>${monitor_db_name}</argument>-->
<!-- <argument>${monitor_db_shadow_name}</argument>-->
<!-- <file>monitor-post.sh</file>-->
<!-- </shell>-->
<!-- <ok to="step21-createObservatoryDB-pre"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<!-- <action name="step20-createMonitorDB-post">-->
<!-- <shell xmlns="uri:oozie:shell-action:0.1">-->
<!-- <job-tracker>${jobTracker}</job-tracker>-->
<!-- <name-node>${nameNode}</name-node>-->
<!-- <exec>monitor-post.sh</exec>-->
<!-- <argument>${monitor_db_name}</argument>-->
<!-- <argument>${monitor_db_shadow_name}</argument>-->
<!-- <file>monitor-post.sh</file>-->
<!-- </shell>-->
<!-- <ok to="step21-createObservatoryDB-pre"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<action name="step21-createObservatoryDB-pre">
<shell xmlns="uri:oozie:shell-action:0.1">
@ -439,8 +478,8 @@
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<exec>copyDataToImpalaCluster.sh</exec>
<!-- <env-var>HADOOP_USER_NAME=${wf:user()}</env-var>-->
<!-- <argument>${external_stats_db_name}</argument>-->
<!-- <env-var>HADOOP_USER_NAME=${wf:user()}</env-var>-->
<!-- <argument>${external_stats_db_name}</argument>-->
<argument>${stats_db_name}</argument>
<argument>${monitor_db_name}</argument>
<argument>${observatory_db_name}</argument>