Merge branch 'hadoop_aggregator' of code-repo.d4science.org:D-Net/dnet-hadoop into hadoop_aggregator

This commit is contained in:
Michele Artini 2021-02-04 09:46:13 +01:00
commit 3ea8c328ac
3 changed files with 11 additions and 9 deletions

View File

@ -149,14 +149,14 @@ public class OaiIterator implements Iterator<String> {
try { try {
doc = reader.read(new StringReader(xml)); doc = reader.read(new StringReader(xml));
} catch (final DocumentException e) { } catch (final DocumentException e) {
log.warn("Error parsing xml, I try to clean it: " + xml, e); log.warn("Error parsing xml, I try to clean it. {}", e.getMessage());
final String cleaned = XmlCleaner.cleanAllEntities(xml); final String cleaned = XmlCleaner.cleanAllEntities(xml);
try { try {
doc = reader.read(new StringReader(cleaned)); doc = reader.read(new StringReader(cleaned));
} catch (final DocumentException e1) { } catch (final DocumentException e1) {
final String resumptionToken = extractResumptionToken(xml); final String resumptionToken = extractResumptionToken(xml);
if (resumptionToken == null) { if (resumptionToken == null) {
throw new CollectorException("Error parsing cleaned document:" + cleaned, e1); throw new CollectorException("Error parsing cleaned document:\n" + cleaned, e1);
} }
return resumptionToken; return resumptionToken;
} }

View File

@ -19,16 +19,19 @@ import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
/** /**
* DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module * CollectorWorkerApplication is the main class responsible to start the metadata collection process, storing the outcomes
* will be executed on the hadoop cluster and taking in input some parameters that tells it which is the right collector * into HDFS. This application will be executed on the hadoop cluster, where invoked in the context of the metadata collection
* plugin to use and where store the data into HDFS path * oozie workflow, it will receive all the input parameters necessary to instantiate the specific collection plugin and the
* relative specific configurations
* *
* @author Sandro La Bruzzo * @author Sandro La Bruzzo, Claudio Atzori
*/ */
public class CollectorWorkerApplication { public class CollectorWorkerApplication {
private static final Logger log = LoggerFactory.getLogger(CollectorWorkerApplication.class); private static final Logger log = LoggerFactory.getLogger(CollectorWorkerApplication.class);
public static final String COLLECTOR_WORKER_ERRORS = "collectorWorker-errors";
/** /**
* @param args * @param args
*/ */
@ -60,7 +63,7 @@ public class CollectorWorkerApplication {
final CollectorWorker worker = new CollectorWorker(api, hdfsuri, hdfsPath); final CollectorWorker worker = new CollectorWorker(api, hdfsuri, hdfsPath);
CollectorPluginErrorLogList errors = worker.collect(); CollectorPluginErrorLogList errors = worker.collect();
populateOOZIEEnv("collectorErrors", errors.toString()); populateOOZIEEnv(COLLECTOR_WORKER_ERRORS, errors.toString());
} }

View File

@ -87,6 +87,7 @@
<arg>--apidescriptor</arg><arg>${apiDescription}</arg> <arg>--apidescriptor</arg><arg>${apiDescription}</arg>
<arg>--namenode</arg><arg>${nameNode}</arg> <arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg> <arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<capture-output/>
</java> </java>
<ok to="GenerateNativeStoreSparkJob"/> <ok to="GenerateNativeStoreSparkJob"/>
<error to="FailCollection"/> <error to="FailCollection"/>
@ -133,7 +134,6 @@
<arg>--action</arg><arg>READ_UNLOCK</arg> <arg>--action</arg><arg>READ_UNLOCK</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg> <arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg> <arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
<capture-output/>
</java> </java>
<ok to="CommitVersion"/> <ok to="CommitVersion"/>
<error to="Kill"/> <error to="Kill"/>
@ -165,7 +165,6 @@
<arg>--action</arg><arg>READ_UNLOCK</arg> <arg>--action</arg><arg>READ_UNLOCK</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg> <arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg> <arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
<capture-output/>
</java> </java>
<ok to="RollBack"/> <ok to="RollBack"/>
<error to="Kill"/> <error to="Kill"/>