1
0
Fork 0

better logging, WIP: collectorWorker error reporting; common functions moved in DHPUtils

This commit is contained in:
Claudio Atzori 2021-02-06 20:12:00 +01:00
parent a8a758925e
commit 40df0f987d
24 changed files with 138 additions and 185 deletions

View File

@ -11,34 +11,4 @@ import com.google.common.collect.Maps;
public class ApplicationUtils {
public static Configuration getHadoopConfiguration(String nameNode) {
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
// Set FileSystem URI
conf.set("fs.defaultFS", nameNode);
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
System.setProperty("hadoop.home.dir", "/");
return conf;
}
public static void populateOOZIEEnv(final Map<String, String> report) throws IOException {
File file = new File(System.getProperty("oozie.action.output.properties"));
Properties props = new Properties();
report.forEach((k, v) -> props.setProperty(k, v));
try(OutputStream os = new FileOutputStream(file)) {
props.store(os, "");
}
}
public static void populateOOZIEEnv(final String paramName, String value) throws IOException {
Map<String, String> report = Maps.newHashMap();
report.put(paramName, value);
populateOOZIEEnv(report);
}
}

View File

@ -1,18 +1,29 @@
package eu.dnetlib.dhp.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Base64OutputStream;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.jayway.jsonpath.JsonPath;
import net.minidev.json.JSONArray;
@ -21,6 +32,8 @@ import scala.collection.Seq;
public class DHPUtils {
private static final Logger log = LoggerFactory.getLogger(DHPUtils.class);
public static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}
@ -79,4 +92,72 @@ public class DHPUtils {
return "";
}
}
public static final ObjectMapper MAPPER = new ObjectMapper();
public static void writeHdfsFile(final Configuration conf, final String content, final String path)
throws IOException {
log.info("writing file {}, size {}", path, content.length());
try (FileSystem fs = FileSystem.get(conf);
BufferedOutputStream os = new BufferedOutputStream(fs.create(new Path(path)))) {
os.write(content.getBytes(StandardCharsets.UTF_8));
os.flush();
}
}
public static String readHdfsFile(Configuration conf, String path) throws IOException {
log.info("reading file {}", path);
try (FileSystem fs = FileSystem.get(conf)) {
final Path p = new Path(path);
if (!fs.exists(p)) {
throw new FileNotFoundException(path);
}
return IOUtils.toString(fs.open(p));
}
}
public static <T> T readHdfsFileAs(Configuration conf, String path, Class<T> clazz) throws IOException {
return MAPPER.readValue(readHdfsFile(conf, path), clazz);
}
public static <T> void saveDataset(final Dataset<T> mdstore, final String targetPath) {
log.info("saving dataset in: {}", targetPath);
mdstore
.write()
.mode(SaveMode.Overwrite)
.format("parquet")
.save(targetPath);
}
public static Configuration getHadoopConfiguration(String nameNode) {
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
// Set FileSystem URI
conf.set("fs.defaultFS", nameNode);
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
System.setProperty("hadoop.home.dir", "/");
return conf;
}
public static void populateOOZIEEnv(final Map<String, String> report) throws IOException {
File file = new File(System.getProperty("oozie.action.output.properties"));
Properties props = new Properties();
report.forEach((k, v) -> props.setProperty(k, v));
try (OutputStream os = new FileOutputStream(file)) {
props.store(os, "");
}
}
public static void populateOOZIEEnv(final String paramName, String value) throws IOException {
Map<String, String> report = Maps.newHashMap();
report.put(paramName, value);
populateOOZIEEnv(report);
}
}

View File

@ -18,7 +18,7 @@ import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2;
import eu.dnetlib.dhp.collection.worker.HttpConnector2;
/**
* Applies the parsing of a csv file and writes the Serialization of it in hdfs

View File

@ -15,7 +15,7 @@ import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2;
import eu.dnetlib.dhp.collection.worker.HttpConnector2;
/**
* Applies the parsing of an excel file and writes the Serialization of it in hdfs

View File

@ -1,69 +0,0 @@
package eu.dnetlib.dhp.aggregation.common;
import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
public class AggregationUtility {
private static final Logger log = LoggerFactory.getLogger(AggregationUtility.class);
public static final ObjectMapper MAPPER = new ObjectMapper();
public static void writeHdfsFile(final Configuration conf, final String content, final String path)
throws IOException {
log.info("writing file {}, size {}", path, content.length());
try (FileSystem fs = FileSystem.get(conf);
BufferedOutputStream os = new BufferedOutputStream(fs.create(new Path(path)))) {
os.write(content.getBytes(StandardCharsets.UTF_8));
os.flush();
}
}
public static String readHdfsFile(Configuration conf, String path) throws IOException {
log.info("reading file {}", path);
try (FileSystem fs = FileSystem.get(conf)) {
final Path p = new Path(path);
if (!fs.exists(p)) {
throw new FileNotFoundException(path);
}
return IOUtils.toString(fs.open(p));
}
}
public static <T> T readHdfsFileAs(Configuration conf, String path, Class<T> clazz) throws IOException {
return MAPPER.readValue(readHdfsFile(conf, path), clazz);
}
public static <T> void saveDataset(final Dataset<T> mdstore, final String targetPath) {
log.info("saving dataset in: {}", targetPath);
mdstore
.write()
.mode(SaveMode.Overwrite)
.format("parquet")
.save(targetPath);
}
}

View File

@ -1,18 +1,14 @@
package eu.dnetlib.dhp.aggregation.mdstore;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import static eu.dnetlib.dhp.application.ApplicationUtils.*;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -80,29 +76,20 @@ public class MDStoreActionNode {
throw new IllegalArgumentException(
"invalid MDStoreVersion value current is " + mdStoreVersion_params);
}
Path hdfstoreSizepath = new Path(mdStoreVersion.getHdfsPath() + MDSTORE_SIZE_PATH);
Configuration conf = new Configuration();
// Set FileSystem URI
conf.set("fs.defaultFS", hdfsuri);
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
try (
FileSystem fs = FileSystem.get(URI.create(hdfsuri), getHadoopConfiguration(hdfsuri));
FSDataInputStream inputStream = fs.open(hdfstoreSizepath)) {
System.setProperty("hadoop.home.dir", "/");
// Get the filesystem - HDFS
FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf);
final Long mdStoreSize = Long.parseLong(IOUtils.toString(inputStream));
Path hdfstoreSizepath = new Path(mdStoreVersion.getHdfsPath() + "/size");
fs.create(hdfstoreSizepath);
DNetRestClient
.doGET(
String.format(COMMIT_VERSION_URL, mdStoreManagerURI, mdStoreVersion.getId(), mdStoreSize));
}
FSDataInputStream inputStream = fs.open(hdfstoreSizepath);
final Long mdStoreSize = Long.parseLong(IOUtils.toString(inputStream));
inputStream.close();
fs.create(hdfstoreSizepath);
DNetRestClient
.doGET(String.format(COMMIT_VERSION_URL, mdStoreManagerURI, mdStoreVersion.getId(), mdStoreSize));
break;
}
case ROLLBACK: {

View File

@ -2,8 +2,8 @@
package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
import java.io.ByteArrayInputStream;
import java.io.IOException;

View File

@ -4,7 +4,7 @@ package eu.dnetlib.dhp.collection.plugin;
import java.util.stream.Stream;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
public interface CollectorPlugin {

View File

@ -15,8 +15,8 @@ import com.google.common.collect.Lists;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams;
import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.HttpClientParams;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
public class OaiCollectorPlugin implements CollectorPlugin {

View File

@ -17,9 +17,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2;
import eu.dnetlib.dhp.collection.worker.utils.XmlCleaner;
import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.HttpConnector2;
import eu.dnetlib.dhp.collection.worker.XmlCleaner;
public class OaiIterator implements Iterator<String> {

View File

@ -3,9 +3,9 @@ package eu.dnetlib.dhp.collection.plugin.oai;
import java.util.Iterator;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2;
import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.HttpClientParams;
import eu.dnetlib.dhp.collection.worker.HttpConnector2;
public class OaiIteratorFactory {

View File

@ -1,9 +1,8 @@
package eu.dnetlib.dhp.collection.worker.utils;
package eu.dnetlib.dhp.collection.worker;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
import eu.dnetlib.dhp.collection.worker.CollectorException;
public class CollectorPluginFactory {

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.collection.worker.utils;
package eu.dnetlib.dhp.collection.worker;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.MAPPER;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
import java.io.Closeable;
import java.io.IOException;
@ -45,7 +45,7 @@ public class CollectorPluginReport extends LinkedHashMap<String, String> impleme
}
public Boolean isSuccess() {
return Boolean.valueOf(get(SUCCESS));
return containsKey(SUCCESS) && Boolean.valueOf(get(SUCCESS));
}
public void setSuccess(Boolean success) {
@ -58,7 +58,7 @@ public class CollectorPluginReport extends LinkedHashMap<String, String> impleme
if (Objects.nonNull(fos)) {
log.info("writing report {} to {}", data, path.toString());
IOUtils.write(data, fos);
ApplicationUtils.populateOOZIEEnv(this);
populateOOZIEEnv(this);
}
}
}

View File

@ -2,7 +2,6 @@
package eu.dnetlib.dhp.collection.worker;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.SEQUENCE_FILE_NAME;
import static eu.dnetlib.dhp.application.ApplicationUtils.*;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
@ -17,10 +16,6 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams;
import eu.dnetlib.dhp.collection.worker.utils.UnknownCollectorPluginException;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.message.MessageSender;

View File

@ -2,29 +2,21 @@
package eu.dnetlib.dhp.collection.worker;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.application.ApplicationUtils.*;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
import java.io.IOException;
import java.util.Optional;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileSystemUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.aggregation.common.AggregationUtility;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams;
import eu.dnetlib.dhp.collection.worker.utils.UnknownCollectorPluginException;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.message.MessageSender;

View File

@ -2,10 +2,7 @@
package eu.dnetlib.dhp.collection.worker;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.REPORT_FILE_NAME;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.MAPPER;
import static eu.dnetlib.dhp.application.ApplicationUtils.getHadoopConfiguration;
import static eu.dnetlib.dhp.application.ApplicationUtils.populateOOZIEEnv;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
import java.io.IOException;
import java.util.Objects;
@ -13,15 +10,11 @@ import java.util.Objects;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.aggregation.common.AggregationUtility;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.utils.UnknownCollectorPluginException;
/**
* CollectorWorkerReporter

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection.worker.utils;
package eu.dnetlib.dhp.collection.worker;
/**
* Bundles the http connection parameters driving the client behaviour.

View File

@ -1,5 +1,7 @@
package eu.dnetlib.dhp.collection.worker.utils;
package eu.dnetlib.dhp.collection.worker;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
import java.io.IOException;
import java.io.InputStream;
@ -13,8 +15,6 @@ import org.apache.http.HttpHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.worker.CollectorException;
/**
* Migrated from https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-modular-collector-service/trunk/src/main/java/eu/dnetlib/data/collector/plugins/HttpConnector.java
*
@ -162,11 +162,19 @@ public class HttpConnector2 {
}
}
throw new CollectorException(
String.format("Unexpected status code: %s error %s", urlConn.getResponseCode(), report));
String
.format(
"Unexpected status code: %s errors: %s", urlConn.getResponseCode(),
MAPPER.writeValueAsString(report)));
} catch (MalformedURLException | SocketException | UnknownHostException e) {
log.error(e.getMessage(), e);
report.put(e.getClass().getName(), e.getMessage());
throw new CollectorException(e.getMessage(), e);
} catch (SocketTimeoutException e) {
log.error(e.getMessage(), e);
report.put(e.getClass().getName(), e.getMessage());
backoffAndSleep(getClientParams().getRetryDelay() * retryNumber * 1000);
return attemptDownload(requestUrl, retryNumber + 1, report);
}
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection.worker.utils;
package eu.dnetlib.dhp.collection.worker;
public class UnknownCollectorPluginException extends Exception {

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection.worker.utils;
package eu.dnetlib.dhp.collection.worker;
import java.util.HashMap;
import java.util.HashSet;

View File

@ -2,8 +2,8 @@
package eu.dnetlib.dhp.transformation;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
import java.io.IOException;
import java.util.Map;

View File

@ -14,7 +14,7 @@ import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2;
import eu.dnetlib.dhp.collection.worker.HttpConnector2;
@Disabled
public class EXCELParserTest {

View File

@ -3,17 +3,13 @@ package eu.dnetlib.dhp.collector.worker;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.nio.file.Path;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.collection.worker.CollectorWorker;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams;
import eu.dnetlib.dhp.collection.worker.CollectorPluginFactory;
import eu.dnetlib.dhp.collection.worker.HttpClientParams;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
@Disabled

View File

@ -1,13 +1,14 @@
package eu.dnetlib.dhp.collector.worker.utils;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
import java.io.IOException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.aggregation.common.AggregationUtility;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
public class CollectorPluginReportTest {
@ -17,11 +18,11 @@ public class CollectorPluginReportTest {
r1.put("a", "b");
r1.setSuccess(true);
String s = AggregationUtility.MAPPER.writeValueAsString(r1);
String s = MAPPER.writeValueAsString(r1);
Assertions.assertNotNull(s);
CollectorPluginReport r2 = AggregationUtility.MAPPER.readValue(s, CollectorPluginReport.class);
CollectorPluginReport r2 = MAPPER.readValue(s, CollectorPluginReport.class);
Assertions.assertTrue(r2.isSuccess(), "should be true");
}