Merge pull request 'Springify and dockerize project (fixed and improved)' (#2) from springify_project into master

Reviewed-on: lsmyrnaios/UrlsController#2
This commit is contained in:
Lampros Smyrnaios 2022-02-04 14:56:16 +01:00
commit 5d70e82504
18 changed files with 859 additions and 1239 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
.gradle/
.idea/
build/
derby.log
logs/
src/main/main.iml

7
Dockerfile Normal file
View File

@ -0,0 +1,7 @@
FROM openjdk:8-jdk-alpine
COPY build/libs/*-SNAPSHOT.jar urls_controller.jar
EXPOSE 1880
ENTRYPOINT ["java","-jar","/urls_controller.jar", "--spring.config.location=file:///mnt/config/application.properties"]

View File

@ -48,10 +48,17 @@ dependencies {
implementation 'io.minio:minio:8.3.5'
// https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp
implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3' // This is required by the minio, as Spring uses a version which is not supported by minio.
implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
// This is required by the minio, as Spring uses a version which is not supported by minio.
// https://mvnrepository.com/artifact/com.cloudera.impala/jdbc
implementation group: 'com.cloudera.impala', name: 'jdbc', version: '2.5.31'
implementation("com.cloudera.impala:jdbc:2.5.31") {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'org.apache.derby', module: 'derby'
exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all'
exclude group: 'log4j', module: 'log4j'
exclude group: 'log4j', module: 'apache-log4j-extras'
}
testImplementation group: 'org.springframework.security', name: 'spring-security-test'
testImplementation "org.springframework.boot:spring-boot-starter-test"
@ -70,4 +77,4 @@ configurations {
test {
useJUnitPlatform()
}
}

View File

@ -1,11 +1,20 @@
cd "${0%/*}" || (echo "Could not chdir to this script's dir!" && exit) # Change the working directory to the script's directory, when running from other location.
justInstall=0
shouldRunInDocker=0
if [[ $# -eq 1 ]]; then
justInstall=$1
elif [[ $# -gt 1 ]]; then
echo -e "Wrong number of arguments given: ${#}\nPlease execute it like: script.sh <justInstall: 0 | 1>"; exit 1
elif [[ $# -eq 2 ]]; then
justInstall=$1
shouldRunInDocker=$2
elif [[ $# -gt 2 ]]; then
echo -e "Wrong number of arguments given: ${#}\nPlease execute it like: script.sh <justInstall: 0 | 1> <shouldRunInDocker: 0 | 1>"; exit 1
fi
if [[ justInstall -eq 1 && shouldRunInDocker -eq 1 ]]; then
echo -e "Cannot run in docker without re-building the project (just to be safe). Setting \"justInstall\" to < 0 >"
justInstall=0
fi
gradleVersion="7.3.3"
@ -27,10 +36,25 @@ if [[ justInstall -eq 0 ]]; then
#gradle tasks # For debugging installation
#gradle -v # For debugging installation
gradle clean
gradle build
gradle clean build
if [[ shouldRunInDocker -eq 1 ]]; then
echo "Give the username for the Docker Hub:"
read -r username
echo -e "\nBuilding docker image..\n"
sudo docker --version || (echo -e "Docker was not found!"; exit 9)
dockerImage=${username}"/urls_controller:latest"
sudo docker build -t "${dockerImage}" .
echo -e "\nPushing docker image.. (the account password is required)..\n"
(sudo docker login -u "${username}" && sudo docker push "${dockerImage}") || true
(sudo mkdir -p "$HOME"/tmp/config && sudo cp ./src/main/resources/application.properties "$HOME"/tmp/config) || true
sudo docker run -d --mount type=bind,source="$HOME"/tmp/config,target=/mnt/config -p 1880:1880 "${dockerImage}" && echo "The docker container started running."
# Run in "detached mode" (in the background).
fi
else
export PATH=/opt/gradle/gradle-${gradleVersion}/bin:$PATH # Make sure the gradle is still accessible (it usually isn't without the "export").
fi
gradle bootRun
if [[ shouldRunInDocker -ne 1 ]]; then
gradle bootRun
fi

View File

@ -1,10 +1,8 @@
package eu.openaire.urls_controller;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.util.S3ObjectStoreMinIO;
import eu.openaire.urls_controller.util.UriBuilder;
import org.springframework.boot.CommandLineRunner;
import eu.openaire.urls_controller.controllers.UrlController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@ -17,13 +15,15 @@ import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
@EnableScheduling
public class Application {
private static final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
new S3ObjectStoreMinIO();
SpringApplication.run(Application.class, args);
}
@ -41,17 +41,30 @@ public class Application {
@PreDestroy
public static void preDestroy()
{
if ( ImpalaConnector.hikariDataSource != null )
ImpalaConnector.hikariDataSource.close();
public void closeThreads() {
logger.info("Shutting down the threads..");
UrlController.insertsExecutor.shutdown(); // Define that no new tasks will be scheduled.
try {
if ( ! UrlController.insertsExecutor.awaitTermination(1, TimeUnit.MINUTES) ) {
logger.warn("The working threads did not finish on time! Stopping them immediately..");
UrlController.insertsExecutor.shutdownNow();
}
} catch (SecurityException se) {
logger.error("Could not shutdown the threads in any way..!", se);
} catch (InterruptedException ie) {
try {
UrlController.insertsExecutor.shutdownNow();
} catch (SecurityException se) {
logger.error("Could not shutdown the threads in any way..!", se);
}
}
}
@Bean
public CommandLineRunner setServerBaseUrl(Environment environment)
{
return args -> new UriBuilder(environment);
}
//
// @Bean
// public CommandLineRunner setServerBaseUrl(Environment environment) {
// return args -> new UriBuilder(environment);
// }
}

View File

@ -1,255 +1,85 @@
package eu.openaire.urls_controller.configuration;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.beans.PropertyVetoException;
import java.io.File;
import java.io.FileReader;
import java.sql.*;
import java.util.Properties;
import javax.annotation.PostConstruct;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public final class ImpalaConnector {
@Repository
public class ImpalaConnector {
private static final Logger logger = LoggerFactory.getLogger(ImpalaConnector.class);
public static String impalaDriver;
public static String impalaConnectionUrl;
public static String oldDatabaseName;
public static String databaseName;
public static String poolName;
public static int hikariMaxConnectionPoolSize;
public static int hikariMinIdleConnections;
public static int hikariConnectionTimeOut;
public static int hikariIdleTimeOut;
public static int hikariMaxLifetime;
@Autowired
private JdbcTemplate jdbcTemplate;
private final String oldDatabaseName;
private final String databaseName;
public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database.
public static HikariDataSource hikariDataSource;
private static final ImpalaConnector singletonObject = new ImpalaConnector();
public static ImpalaConnector getInstance()
{
return singletonObject;
public ImpalaConnector(@Value("${services.pdfaggregation.controller.db.oldDatabaseName}") String oldDatabaseName,
@Value("${services.pdfaggregation.controller.db.databaseName}") String databaseName) {
this.oldDatabaseName = oldDatabaseName;
this.databaseName = databaseName;
}
public ImpalaConnector()
{
@PostConstruct
public void init() {
logger.info("Max available memory to the Controller: " + Runtime.getRuntime().maxMemory() + " bytes.");
try {
String dbSettingsPropertyFile = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "application.properties";
FileReader fReader = new FileReader(dbSettingsPropertyFile);
Properties props = new Properties();
props.load(fReader); // Load jdbc related properties.
// Get each property value.
impalaDriver = props.getProperty("spring.impala.driver-class-name");
if ( !"".equals(impalaDriver) ) { // If not "null" or empty.
Class.forName(impalaDriver);
impalaConnectionUrl = props.getProperty("spring.impala.url");
oldDatabaseName = props.getProperty("spring.impala.oldDatabaseName");
databaseName = props.getProperty("spring.impala.databaseName");
poolName = props.getProperty("spring.datasource.hikari.pool-name");
hikariMaxConnectionPoolSize = Integer.parseInt(props.getProperty("spring.datasource.hikari.maximumPoolSize"));
hikariMaxLifetime = Integer.parseInt(props.getProperty("spring.datasource.hikari.maxLifetime"));
hikariMinIdleConnections = Integer.parseInt(props.getProperty("spring.datasource.hikari.minimumIdle"));
hikariConnectionTimeOut = Integer.parseInt(props.getProperty("spring.datasource.hikari.connectionTimeout"));
hikariIdleTimeOut = Integer.parseInt(props.getProperty("spring.datasource.hikari.idleTimeout"));
} else
throw new RuntimeException("The \"impalaDriver\" was null or empty!");
} catch(Exception e) {
String errorMsg = "Error when loading the database properties!\n" + e.getMessage();
logger.error(errorMsg, e);
System.err.println(errorMsg);
System.exit(11);
boolean supportsBatchUpdates = jdbcTemplate.getDataSource().getConnection().getMetaData().supportsBatchUpdates();
logger.info("The database " + (supportsBatchUpdates ? "supports" : "does not support") + " \"BatchUpdates\"!");
} catch (Exception e) {
logger.error("Error testing if database supports batch updates!", e);
}
try {
hikariDataSource = impalaDS();
} catch (SQLException | PropertyVetoException e) {
logger.error("Problem when creating the Hikari connection pool!", e);
}
createDatabase();
createDatabase(); // In case of an exception, the App will exit with the stacktrace.
}
public HikariDataSource impalaDS() throws SQLException, PropertyVetoException
{
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setDriverClassName(ImpalaConnector.impalaDriver);
hikariConfig.setAutoCommit(true);
hikariConfig.setJdbcUrl(ImpalaConnector.impalaConnectionUrl);
hikariConfig.setPoolName(poolName);
hikariConfig.setMaximumPoolSize(hikariMaxConnectionPoolSize);
hikariConfig.setMaxLifetime(hikariMaxLifetime);
hikariConfig.setMinimumIdle(hikariMinIdleConnections);
hikariConfig.setConnectionTimeout(hikariConnectionTimeOut);
hikariConfig.setIdleTimeout(hikariIdleTimeOut);
return new HikariDataSource(hikariConfig);
}
public void createDatabase()
{
Connection con = getConnection();
if ( con == null )
System.exit(22);
try {
if ( !con.getMetaData().supportsBatchUpdates() )
logger.warn("The database does not support \"BatchUpdates\"!");
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
private void createDatabase() {
logger.info("Going to create the database and the tables, if they do not exist. Also will fill some tables with data from OpenAIRE.");
Statement statement = null;
try {
statement = con.createStatement();
} catch (SQLException sqle) {
logger.error("Problem when creating a connection-statement!\n" + sqle.getMessage());
ImpalaConnector.closeConnection(con);
System.exit(33);
}
try {
statement.execute("CREATE DATABASE IF NOT EXISTS " + databaseName);
jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + databaseName);
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + oldDatabaseName + ".publication");
statement.execute("COMPUTE STATS " + databaseName + ".publication");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + oldDatabaseName + ".publication");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication");
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + oldDatabaseName + ".publication_pids");
statement.execute("COMPUTE STATS " + databaseName + ".publication_pids");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + oldDatabaseName + ".publication_pids");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_pids");
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + oldDatabaseName + ".publication_urls");
statement.execute("COMPUTE STATS " + databaseName + ".publication_urls");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + oldDatabaseName + ".publication_urls");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_urls");
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + oldDatabaseName + ".datasource");
statement.execute("COMPUTE STATS " + databaseName + ".datasource");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + oldDatabaseName + ".datasource");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".datasource");
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet");
statement.execute("COMPUTE STATS " + databaseName + ".assignment");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".assignment");
statement.execute("DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE");
jdbcTemplate.execute("DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE");
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet");
statement.execute("COMPUTE STATS " + databaseName + ".attempt");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".attempt");
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
statement.execute("COMPUTE STATS " + databaseName + ".payload");
} catch (SQLException sqle) {
String errorMsg = "Problem when executing the \"create database and create tables queries!\n" + sqle.getMessage() + "\nSQL state: " + sqle.getSQLState() + "\nError code: " + sqle.getErrorCode();
logger.error(errorMsg, sqle);
System.err.println(errorMsg);
System.exit(44);
} finally {
try {
statement.close();
con.close();
} catch (SQLException sqle2) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
}
}
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload");
logger.info("The database \"" + databaseName + "\" and its tables were created or validated.");
}
public Connection getConnection()
{
try {
return hikariDataSource.getConnection();
//return DriverManager.getConnection(impalaConnectionUrl, null, null); // This is for non pooled connections.
} catch (SQLException sqle) {
logger.error("Problem when connecting with the Impala-database!\n" + sqle.getMessage());
return null;
}
}
public boolean testDatabaseAccess()
{
logger.info("Going to test Impala access..");
Connection con = getConnection();
if ( con == null )
return false;
ResultSet res = null;
try {
String tableName = "publication";
// show tables
String sql = "show tables '" + tableName + "'";
logger.debug("Running: " + sql);
res = con.prepareStatement(sql).executeQuery();
if ( res.next() ) {
logger.debug(res.getString(1));
}
// describe table
sql = "describe " + tableName;
logger.debug("Running: " + sql);
res = con.prepareStatement(sql).executeQuery();
while ( res.next() ) {
logger.debug(res.getString(1) + "\t" + res.getString(2));
}
// select * query
sql = "select * from " + tableName + " limit 3;";
logger.debug("Running: " + sql);
res = con.prepareStatement(sql).executeQuery();
while ( res.next() ) {
logger.debug(res.getString(1));
}
// Get Assignments, only for testing here.
//UrlController urlController = new UrlController();
//ResponseEntity<?> responseEntity = urlController.getUrls("worker_1", ControllerConstants.ASSIGNMENTS_LIMIT);
//logger.debug(responseEntity.toString());
} catch (SQLException sqle) {
logger.error(sqle.getMessage(), sqle);
return false;
} finally {
try {
if ( res != null )
res.close();
con.close();
} catch (SQLException sqle) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle);
}
}
return true;
}
public static boolean closeConnection(Connection con) {
try {
if ( con != null )
con.close(); // It may have already closed and that's fine.
return true;
} catch (SQLException sqle) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage());
return false;
}
}
public static String handlePreparedStatementException(String queryName, String query, String preparedStatementName, PreparedStatement preparedStatement, Connection con, Exception e)
{
public static String handleQueryException(String queryName, String query, Exception e) {
String errorMsg = "Problem when creating " + (( ! queryName.startsWith("get")) ? "and executing " : "") + "the prepared statement for \"" + queryName + "\"!\n";
logger.error(errorMsg + "\n\n" + query + "\n\n" + e.getMessage(), e);
closeConnection(con);
logger.error(errorMsg + "\n\n" + query + "\n\n", e);
return errorMsg;
}

View File

@ -21,4 +21,5 @@ public class GeneralController {
return ResponseEntity.ok().build();
}
}

View File

@ -1,58 +1,41 @@
package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.sql.Connection;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
/**
* This controller will test the connectivity with the database and return statistics!
*/
@RestController
@RequestMapping("/impala")
public class ImpalaController {
// This controller will test the connectivity with the database and return statistics!
private static final Logger logger = LoggerFactory.getLogger(ImpalaController.class);
@Autowired
private JdbcTemplate jdbcTemplate;
@GetMapping("get10PublicationIdsTest")
public ResponseEntity<?> get10PublicationIdsTest() {
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
String query = "SELECT id FROM publication LIMIT 10;";
try ( ResultSet res = con.prepareStatement(query).executeQuery()) {
if ( !res.first() ) {
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\"!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
List<String> publications = new ArrayList<>();
do {
publications.add(res.getString(0));
} while ( res.next() );
return new ResponseEntity<String>(publications.toString(), HttpStatus.OK);
try {
List<String> publications = jdbcTemplate.queryForList(query, String.class);
return new ResponseEntity<>(publications.toString(), HttpStatus.OK);
} catch (Exception e) {
String errorMsg = "Problem when executing \"getAssignmentsQuery\": " + query;
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
ImpalaConnector.closeConnection(con);
}
}
}

View File

@ -0,0 +1,88 @@
package eu.openaire.urls_controller.controllers;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.models.Assignment;
import eu.openaire.urls_controller.models.Datasource;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.GenericUtils;
import eu.openaire.urls_controller.util.TestFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.sql.Timestamp;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
@RestController
@RequestMapping("/test")
public class TestController {
private static final Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
private TestFileUtils testFileUtils;
@Value("${services.pdfaggregation.controller.assignmentLimit}")
private int assignmentLimit;
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
@GetMapping("urls")
public ResponseEntity<?> getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + this.assignmentLimit);
logger.debug("Going to retrieve the data from the inputResourceFile: " + testFileUtils.testResource.getFilename());
List<Assignment> assignments = new ArrayList<>();
HashMultimap<String, String> loadedIdUrlPairs;
boolean isFirstRun = true;
boolean assignmentsLimitReached = false;
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
// Start loading urls.
while ( true ) {
loadedIdUrlPairs = testFileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile.
if ( testFileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on.
break;
else
isFirstRun = false;
Set<Map.Entry<String, String>> pairs = loadedIdUrlPairs.entries();
for ( Map.Entry<String,String> pair : pairs ) {
if ( assignments.size() >= workerAssignmentsLimit ) {
assignmentsLimitReached = true;
break;
}
int randomNum = GenericUtils.getRandomNumber(1, 5);
assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, timestamp));
}// end pairs-for-loop
if ( assignmentsLimitReached ) {
logger.debug("Done loading urls from the inputFile as the assignmentsLimit (" + workerAssignmentsLimit + ") was reached.");
break;
}
}// end loading-while-loop
Scanner scanner = testFileUtils.inputScanner.get();
if ( scanner != null ) // Check if the initial value is null.
scanner.close();
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + testFileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
}
}

View File

@ -1,36 +1,55 @@
package eu.openaire.urls_controller.controllers;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.ControllerConstants;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.sql.*;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
@RestController
@RequestMapping("/urls")
public class UrlController {
private static final Logger logger = LoggerFactory.getLogger(UrlController.class);
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private FileUtils fileUtils;
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
private static int maxAttemptsPerRecord = ControllerConstants.MAX_ATTEMPTS_PER_RECORD;
@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}")
private int maxAttemptsPerRecord;
@Value("${services.pdfaggregation.controller.assignmentLimit}")
private int assignmentLimit;
@Value("${services.pdfaggregation.controller.db.databaseName}")
private String databaseName;
@GetMapping("")
@ -43,7 +62,7 @@ public class UrlController {
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit);
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
@ -53,156 +72,100 @@ public class UrlController {
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
} else if ( assignmentsLimit > ControllerConstants.ASSIGNMENTS_LIMIT ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + ControllerConstants.ASSIGNMENTS_LIMIT + "). Will use the Controller's limit.");
assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT;
} else if ( assignmentsLimit > assignmentLimit ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + assignmentLimit + "). Will use the Controller's limit.");
assignmentsLimit = assignmentLimit;
}
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
"from " + ImpalaConnector.databaseName + ".publication p\n" +
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
"left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
"from " + databaseName + ".publication p\n" +
"join " + databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + databaseName + ".datasource d on d.id=p.datasourceid\n" +
"left outer join (select count(a.id) as counts, a.id from " + databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
"left outer join (\n" +
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
" select a.id, a.original_url from " + databaseName + ".assignment a\n" +
" union all\n" +
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecord + " and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
" select pl.id, pl.original_url from " + databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecord + " and not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
"limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit " + assignmentsLimit + ") as findAssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
String createAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
String createCurrentAssignmentsQuery = "create table " + databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + databaseName + ".current_assignment";
String getAssignmentsQuery = "select * from " + databaseName + ".current_assignment";
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) { // This is already logged in "getConnection()".
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
}
// All transactions in Impala automatically commit at the end of the statement. Currently, Impala does not support multi-statement transactions.
// https://impala.apache.org/docs/build/html/topics/impala_transactions.html
// We cannot use "savePoints" along with "autoCommit = false" to roll back to a previous state among multiple statements.
PreparedStatement createCurrentAssignmentsPreparedStatement = null;
try {
createCurrentAssignmentsPreparedStatement = con.prepareStatement(createAssignmentsQuery);
// We cannot set the "limits" and the MAX_ATTEMPTS_PER_RECORD as preparedStatements parameters, as we get a "java.sql.SQLException: [Simba][JDBC](11420) Error, parameter metadata not populated."
createCurrentAssignmentsPreparedStatement.execute();
} catch (SQLException sqle) {
jdbcTemplate.execute(createCurrentAssignmentsQuery);
} catch (Exception sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = ImpalaConnector.handlePreparedStatementException("createAssignmentsQuery", createAssignmentsQuery, "createCurrentAssignmentsPreparedStatement", createCurrentAssignmentsPreparedStatement, con, sqle);
String errorMsg = ImpalaConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( createCurrentAssignmentsPreparedStatement != null )
createCurrentAssignmentsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Failed to close the \"createCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
}
}
PreparedStatement computeCurrentAssignmentsStatsPreparedStatement = null;
try {
computeCurrentAssignmentsStatsPreparedStatement = con.prepareStatement(computeCurrentAssignmentsStatsQuery);
computeCurrentAssignmentsStatsPreparedStatement.execute();
} catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery);
} catch (Exception sqle) {
String errorMsg = dropCurrentAssignmentTable();
ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( computeCurrentAssignmentsStatsPreparedStatement != null )
computeCurrentAssignmentsStatsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Failed to close the \"computeCurrentAssignmentsStatsPreparedStatement\"!\n" + sqle2.getMessage());
}
}
PreparedStatement getAssignmentsPreparedStatement = null;
try {
getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery);
} catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle);
// The "getAssignmentsPreparedStatement" will always be null here, so we do not close it.
errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
try ( ResultSet resultSet = getAssignmentsPreparedStatement.executeQuery() ) {
// Unfortunately, we cannot use the following as the used version of the Impala-driver does not support it.
/*if ( !resultSet.first() ) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg);
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
}*/
// The cursor is automatically before the first element in this configuration.
while ( resultSet.next() ) { // Move the cursor forward.
// If the resultsSet is empty, then the control will never get inside the loop.
// The following few lines, cannot be outside the "while" loop, since the same object is added, despite that we update the inner-values.
try {
jdbcTemplate.query(getAssignmentsQuery, rs -> {
Assignment assignment = new Assignment();
assignment.setWorkerId(workerId);
assignment.setTimestamp(timestamp);
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned. The indexing starts from 1
assignment.setId(resultSet.getString(1));
assignment.setOriginalUrl(resultSet.getString(2));
datasource.setId(resultSet.getString(3));
datasource.setName(resultSet.getString(4));
assignment.setId(rs.getString(1));
assignment.setOriginalUrl(rs.getString(2));
datasource.setId(rs.getString(3));
datasource.setName(rs.getString(4));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + resultSet.getRow(), sqle);
continue; // This object is broken, move to the next row.
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
}
assignment.setDatasource(datasource);
assignments.add(assignment);
}
});
} catch (Exception e) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
String errorMsg = dropCurrentAssignmentTable();
ImpalaConnector.databaseLock.unlock();
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
logger.error(errorMsg, e);
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
getAssignmentsPreparedStatement.close();
} catch (SQLException sqle) {
logger.error("Failed to close the \"getAssignmentsPreparedStatement\"!\n" + sqle.getMessage());
}
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
String errorMsg = dropCurrentAssignmentTable();
ImpalaConnector.databaseLock.unlock();
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests.
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecord + " for the next requests.";
logger.error(errorMsg);
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
} else if ( assignmentsSize < assignmentsLimit ) {
maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests.
@ -215,44 +178,37 @@ public class UrlController {
// Write the Assignment details to the assignment-table.
// The "timestamp" is generated from the Java-code, so it's in no way provided by a 3rd party.
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
String insertAssignmentsQuery = "insert into " + databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
+ "from (\n select pubid, url from " + databaseName + ".current_assignment) as pub_data";
PreparedStatement insertAssignmentsPreparedStatement = null;
try {
insertAssignmentsPreparedStatement = con.prepareStatement(insertAssignmentsQuery);
insertAssignmentsPreparedStatement.execute();
} catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
jdbcTemplate.execute(insertAssignmentsQuery);
} catch (Exception sqle) {
String errorMsg = dropCurrentAssignmentTable();
ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle);
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( insertAssignmentsPreparedStatement != null )
insertAssignmentsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Failed to close the \"insertAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
}
}
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
String errorMsg = dropCurrentAssignmentTable();
if ( errorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
String mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, "", null);
String mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
@ -260,6 +216,8 @@ public class UrlController {
}
public static ExecutorService insertsExecutor = Executors.newFixedThreadPool(2);
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
@ -294,171 +252,128 @@ public class UrlController {
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = FileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
}
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available and continue with writing the attempts and the payloads.
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
}
// Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons.
String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
int[] payloadArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
String insertIntoAttemptBaseQuery = "INSERT INTO " + databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
int[] attemptArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
final AtomicInteger failedCount = new AtomicInteger(0);
List<Callable<Void>> callableTasks = new ArrayList<>(2);
// One thread will handle the inserts to the "payload" table adn the other to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala).
callableTasks.add(() -> { // Handle inserts to the "payload" table.
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
continue;
}
try {
Long size = payload.getSize();
Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getActual_url(), payload.getTimestamp_acquired(),
payload.getMime_type(), (size != null) ? String.valueOf(size) : null, payload.getHash(),
payload.getLocation(), payload.getProvenance()};
jdbcTemplate.update(insertIntoPayloadBaseQuery, args, payloadArgTypes);
} catch (Exception sqle) {
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": ", sqle);
failedCount.incrementAndGet();
}
}
return null;
});
callableTasks.add(() -> { // Handle inserts to the "attempt" table.
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
continue;
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(),
urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()};
jdbcTemplate.update(insertIntoAttemptBaseQuery, args, attemptArgTypes);
} catch (Exception sqle) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage());
failedCount.incrementAndGet();
}
}
return null;
});
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
}
// Store the workerReport into the database.
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
String tempInsertQueryName = null;
PreparedStatement preparedInsertPayloadStatement = null, preparedInsertAttemptStatement = null;
try {
tempInsertQueryName = "insertIntoPayloadBaseQuery";
preparedInsertPayloadStatement = con.prepareStatement(insertIntoPayloadBaseQuery);
tempInsertQueryName = "insertIntoAttemptBaseQuery";
preparedInsertAttemptStatement = con.prepareStatement(insertIntoAttemptBaseQuery);
} catch (SQLException sqle) {
try { // Invoke all the tasks and wait for them to finish before moving to the next batch.
insertsExecutor.invokeAll(callableTasks);
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
// TODO - This is a very rare case, but what should be done..?
} catch (Exception e) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n";
logger.error(errorMsg + sqle.getMessage());
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage();
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
try {
con.setAutoCommit(false); // Avoid writing to disk for each insert. Write them all in the end.
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!\n";
logger.error(errorMsg + sqle.getMessage());
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
int failedQueries = failedCount.get();
String failedQueriesMsg = failedQueries + " out of " + urlReports.size() + " failed to be processed!";
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables" + ((failedQueries > 0) ? (", although " + failedQueriesMsg) : ".")
+ " Going to merge the parquet files for those tables.");
String payloadErrorMsg = null;
int failedCount = 0;
// TODO - Think about handling this loop with multiple threads..
// The Impala-server will handle the synchronization itself..
// Check online what happens with "statement.setPoolable()" does it improves speed? in multi or also in single thread?
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.error("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments);
payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!";
continue;
}
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
preparedInsertPayloadStatement.setString(1, payload.getId());
preparedInsertPayloadStatement.setString(2, payload.getOriginal_url());
preparedInsertPayloadStatement.setString(3, payload.getActual_url());
preparedInsertPayloadStatement.setTimestamp(4, payload.getTimestamp_acquired());
preparedInsertPayloadStatement.setString(5, payload.getMime_type());
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
String sizeStr = null;
Long size = payload.getSize();
if ( size != null )
sizeStr = String.valueOf(size);
preparedInsertPayloadStatement.setString(6, sizeStr);
preparedInsertPayloadStatement.setString(7, payload.getHash());
preparedInsertPayloadStatement.setString(8, payload.getLocation());
preparedInsertPayloadStatement.setString(9, payload.getProvenance());
preparedInsertPayloadStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": " + sqle.getMessage() + "\n\n");
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
preparedInsertAttemptStatement.setString(1, payload.getId());
preparedInsertAttemptStatement.setString(2, payload.getOriginal_url());
preparedInsertAttemptStatement.setTimestamp(3, payload.getTimestamp_acquired());
preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString());
preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null" too.
preparedInsertAttemptStatement.setString(6, error.getMessage());
preparedInsertAttemptStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage() + "\n\n");
}
}//end for-loop
try {
con.commit(); // Commit all the insert-queries to the database (write them to disk).
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!";
logger.error(errorMsg + "\n" + sqle.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here, as we might move forward.
}
if ( payloadErrorMsg != null )
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables, although " + payloadErrorMsg + " Going to merge the parquet files for those tables.");
else
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");
String mergeErrorMsg = FileUtils.mergeParquetFiles("payload", con, "", null);
String mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
mergeErrorMsg = FileUtils.mergeParquetFiles("attempt", con, "", null);
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
// This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We do not need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, " WHERE workerid != ", curWorkerId);
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
try {
con.commit(); // Apply the merges permanently (write them to disk).
con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool.
} catch (SQLException sqle) {
String errorMsg = "Problem when committing changes to the database!";
logger.error(errorMsg + "\n" + sqle.getMessage());
// The statements used in "mergeParquetFiles()" are already closed.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
}
ImpalaConnector.databaseLock.unlock();
logger.debug("Finished merging the database tables.");
return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg);
return ResponseEntity.status(HttpStatus.OK).body(failedQueriesMsg);
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException
{
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
sb.append(baseInsertQuery);
for ( int i=1; i <= dataSize; ++i ) {
@ -485,96 +400,13 @@ public class UrlController {
}
private boolean closeStatements(Statement statement1, Statement statement2, Connection con) {
private String dropCurrentAssignmentTable() {
String dropCurrentAssignmentsQuery = "DROP TABLE " + databaseName + ".current_assignment PURGE";
try {
if ( statement1 != null )
statement1.close();
if ( statement2 != null )
statement2.close();
if ( con != null )
con.close(); // It may have already closed and that's fine.
return true;
} catch (SQLException sqle) {
logger.error("Could not close the statements or the connection with the Impala-database.\n" + sqle.getMessage());
return false;
}
}
@GetMapping("test")
public ResponseEntity<?> getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
try {
new FileUtils(); // Find the input file.
} catch (Exception e) {
logger.error(e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The resource file, for the requested assignments, was not found.");
}
List<Assignment> assignments = new ArrayList<>();
HashMultimap<String, String> loadedIdUrlPairs;
boolean isFirstRun = true;
boolean assignmentsLimitReached = false;
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
// Start loading urls.
while ( true ) {
loadedIdUrlPairs = FileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile.
if ( FileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on.
break;
else
isFirstRun = false;
Set<Map.Entry<String, String>> pairs = loadedIdUrlPairs.entries();
for ( Map.Entry<String,String> pair : pairs )
{
if ( assignments.size() >= workerAssignmentsLimit ) {
assignmentsLimitReached = true;
break;
}
int randomNum = GenericUtils.getRandomNumber(1, 5);
assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, timestamp));
}// end pairs-for-loop
if ( assignmentsLimitReached ) {
logger.debug("Done loading urls from the inputFile as the assignmentsLimit (" + workerAssignmentsLimit + ") was reached.");
break;
}
}// end loading-while-loop
Scanner scanner = FileUtils.inputScanner.get();
if ( scanner != null ) // Check if the initial value is null.
scanner.close();
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
}
private String dropCurrentAssignmentTable(Connection con)
{
String dropCurrentAssignmentsQuery = "DROP TABLE " + ImpalaConnector.databaseName + ".current_assignment PURGE";
PreparedStatement dropCurrentAssignmentsPreparedStatement = null;
try {
dropCurrentAssignmentsPreparedStatement = con.prepareStatement(dropCurrentAssignmentsQuery);
dropCurrentAssignmentsPreparedStatement.execute();
jdbcTemplate.execute(dropCurrentAssignmentsQuery);
return null;
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
return ImpalaConnector.handlePreparedStatementException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, "dropCurrentAssignmentsPreparedStatement", dropCurrentAssignmentsPreparedStatement, con, sqle);
} finally {
try {
if ( dropCurrentAssignmentsPreparedStatement != null )
dropCurrentAssignmentsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Failed to close the \"dropCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
}
} catch (Exception e) {
return ImpalaConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e);
}
}

View File

@ -1,10 +0,0 @@
package eu.openaire.urls_controller.util;
public interface ControllerConstants {
int ASSIGNMENTS_LIMIT = 100_000; // The upper assignments-limit the Controller can handle. If the worker's limit is above this one, then the controller's limit is used. Otherwise, the worker's limit will be applied.
int MAX_ATTEMPTS_PER_RECORD = 3; // The maximum times a record can be processed, if each of the previous times failed with a "couldRetry" Error-Class.
}

View File

@ -2,6 +2,7 @@ package eu.openaire.urls_controller.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.FileInputStream;
@ -12,20 +13,16 @@ import java.nio.file.StandardCopyOption;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@Component
public class FileUnZipper {
private static final Logger logger = LoggerFactory.getLogger(FileUnZipper.class);
public static void unzipFolder(Path source, Path target) throws Exception
{
try ( ZipInputStream zis = new ZipInputStream(new FileInputStream(source.toFile())) )
{
public void unzipFolder(Path source, Path target) throws Exception {
try ( ZipInputStream zis = new ZipInputStream(new FileInputStream(source.toFile())) ) {
// Iterate over the files in zip and un-zip them.
ZipEntry zipEntry = zis.getNextEntry();
while ( zipEntry != null )
{
while ( zipEntry != null ) {
Path targetPath = zipSlipProtect(zipEntry, target);
if ( zipEntry.getName().endsWith(File.separator) ) // If we have a directory.
@ -44,10 +41,8 @@ public class FileUnZipper {
}
}
// Protect from a Zip Slip attack: https://snyk.io/research/zip-slip-vulnerability
public static Path zipSlipProtect(ZipEntry zipEntry, Path targetDir) throws IOException
{
public Path zipSlipProtect(ZipEntry zipEntry, Path targetDir) throws IOException {
Path targetDirResolved = targetDir.resolve(zipEntry.getName());
// Make sure normalized file still has targetDir as its prefix, else throw an exception.
Path normalizePath = targetDirResolved.normalize();
@ -56,5 +51,4 @@ public class FileUnZipper {
}
return normalizePath;
}
}
}

View File

@ -3,12 +3,15 @@ package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.Task;
import eu.openaire.urls_controller.models.UrlReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.configurationprocessor.json.JSONException;
import org.springframework.boot.configurationprocessor.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
import java.io.*;
@ -17,41 +20,32 @@ import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.*;
import java.util.*;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Component
public class FileUtils {
private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
public static ThreadLocal<Scanner> inputScanner = new ThreadLocal<Scanner>(); // Every Thread has its own variable.
private static final ThreadLocal<Integer> fileIndex = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> unretrievableInputLines = new ThreadLocal<Integer>();
public static ThreadLocal<Integer> duplicateIdUrlEntries = new ThreadLocal<Integer>();
public static final int jsonBatchSize = 3000;
private static final String utf8Charset = "UTF-8";
public static String inputFileFullPath;
private static final String workingDir = System.getProperty("user.dir") + File.separator;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private S3ObjectStore s3ObjectStore;
@Autowired
private FileUnZipper fileUnZipper;
@Value("${services.pdfaggregation.controller.db.databaseName}")
private String databaseName;
public FileUtils() throws RuntimeException
{
inputFileFullPath = workingDir + "src" + File.separator + "main" + File.separator + "resources";
String resourceFileName = "testInputFiles" + File.separator + "orderedList1000.json";
inputFileFullPath += File.separator + resourceFileName;
InputStream inputStream = getClass().getClassLoader().getResourceAsStream(resourceFileName);
if ( inputStream == null )
throw new RuntimeException("No resourceFile was found with name \"" + resourceFileName + "\".");
logger.debug("Going to retrieve the data from the inputResourceFile: " + resourceFileName);
FileUtils.inputScanner.set(new Scanner(inputStream, utf8Charset));
fileIndex.set(0); // Re-initialize the file-number-pointer.
unretrievableInputLines.set(0);
duplicateIdUrlEntries.set(0);
}
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError}
/**
@ -60,8 +54,7 @@ public class FileUtils {
* Renames the clone to the original's name.
* Returns the errorMsg, if an error appears, otherwise is returns "null".
* */
public static String mergeParquetFiles(String tableName, Connection con, String whereClause, String parameter)
{
public String mergeParquetFiles(String tableName, String whereClause, String parameter) {
String errorMsg;
if ( tableName == null ) {
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
@ -76,44 +69,33 @@ public class FileUtils {
if ( parameter == null )
parameter = "";
else
parameter = " '" + parameter + "'"; // This will be a "string-check".
Statement statement;
try {
statement = con.createStatement();
} catch (SQLException sqle) {
errorMsg = "Problem when creating a connection-statement!\n";
logger.error(errorMsg + sqle.getMessage());
return errorMsg;
}
parameter = " '" + parameter + "'"; // This will be a "string-check", thus the single-quotes.
try {
statement.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName + " " + whereClause + parameter);
statement.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE");
statement.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName);
statement.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName);
} catch (SQLException sqle) {
jdbcTemplate.execute("CREATE TABLE " + databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + databaseName + "." + tableName + " " + whereClause + parameter);
jdbcTemplate.execute("DROP TABLE " + databaseName + "." + tableName + " PURGE");
jdbcTemplate.execute("ALTER TABLE " + databaseName + "." + tableName + "_tmp RENAME TO " + databaseName + "." + tableName);
jdbcTemplate.execute("COMPUTE STATS " + databaseName + "." + tableName);
} catch (DataAccessException e) {
errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n";
logger.error(errorMsg + getCutBatchExceptionMessage(sqle.getMessage()), sqle);
logger.error(errorMsg, e);
return errorMsg;
} finally {
// Make sure we close the statement.
try { statement.close(); }
catch (SQLException sqle3) { logger.error("Could not close the statement for executing queries in the Impala-database.\n" + sqle3); }
}
return null; // No errorMsg, everything is fine.
}
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError};
private static final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$");
private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$");
public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator;
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
private final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$");
private final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$");
public static UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId)
{
@Value("services.pdfaggregation.controller.baseTargetLocation")
private String baseTargetLocation;
private final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId) {
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
@ -125,31 +107,15 @@ public class FileUtils {
if ( remoteAddr == null || "".equals(remoteAddr) )
remoteAddr = request.getRemoteAddr();
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
ImpalaConnector.databaseLock.unlock();
logger.error("Problem when creating the Impala-connection!");
return UploadFullTextsResponse.databaseError;
}
String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ?" ;
PreparedStatement getFileLocationForHashPreparedStatement = null;
try {
getFileLocationForHashPreparedStatement = con.prepareStatement(getFileLocationForHashQuery);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
logger.error("Problem when creating the prepared statement for \"" + getFileLocationForHashQuery + "\"!\n" + sqle.getMessage());
return UploadFullTextsResponse.databaseError;
}
// Get the file-locations.
int numFullTextUrlsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
HashMultimap<String, String> allFileNamesWithIDsHashMap = HashMultimap.create((urlReports.size() / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
String getFileLocationForHashQuery = "select `location` from " + databaseName + ".payload where `hash` = ? limit 1" ;
for ( UrlReport urlReport : urlReports )
{
ImpalaConnector.databaseLock.lock();
for ( UrlReport urlReport : urlReports ) {
UrlReport.StatusType statusType = urlReport.getStatus();
if ( (statusType == null) || statusType.equals(UrlReport.StatusType.non_accessible) ) {
continue;
@ -169,31 +135,25 @@ public class FileUtils {
String fileHash = payload.getHash();
if ( fileHash != null ) {
try {
getFileLocationForHashPreparedStatement.setString(1, fileHash);
} catch (SQLException sqle) {
logger.error("Error when setting the parameter in \"getFileLocationForHashQuery\"!\n" + sqle.getMessage());
}
try ( ResultSet resultSet = getFileLocationForHashPreparedStatement.executeQuery() ) {
if ( resultSet.next() ) { // Move the "cursor" to the first row. If there is any data, then take the first result (there should not be more, but we still want the first anyway).
fileLocation = resultSet.getString(1);
if ( fileLocation != null ) { // If the full-text of this record is already-found and uploaded.
payload.setLocation(fileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
//logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + fileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
continue; // Do not request the file from the worker, it's already uploaded. Move on.
}
}
fileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[] {fileHash}, new int[] {Types.VARCHAR}, String.class);
} catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n" + e.getMessage());
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n", e);
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - Since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not auto-closed here)and the Database connection.
}
if ( fileLocation != null ) { // If the full-text of this record is already-found and uploaded.
payload.setLocation(fileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
//logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + fileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
continue; // Do not request the file from the worker, it's already uploaded. Move on.
}
}
// If the full-text of this record was not found by a previous batch..
// If the full-text of this record was not found by a previous batch...
fileLocation = payload.getLocation();
if ( fileLocation != null ) { // If the docFile was downloaded (without an error)..
Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation);
@ -207,18 +167,9 @@ public class FileUtils {
allFileNamesWithIDsHashMap.put(fileNameWithExtension, payload.getId()); // The keys and the values are not duplicate. Task with ID-1 might have an "ID-1.pdf" file.
// While a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
}
}
}// end-for
// Close the Prepared Statement.
try {
if ( getFileLocationForHashPreparedStatement != null )
getFileLocationForHashPreparedStatement.close();
} catch (SQLException sqle) {
logger.error("Failed to close the \"getFileLocationForHashPreparedStatement\"!\n" + sqle.getMessage());
} finally {
ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database.
ImpalaConnector.closeConnection(con);
}
ImpalaConnector.databaseLock.unlock(); // The remaining work of this function does not use the database.
logger.info("NumFullTextUrlsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextUrlsFound + " (out of " + urlReports.size() + ").");
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
@ -252,8 +203,7 @@ public class FileUtils {
File curAssignmentsBaseDir = new File(curAssignmentsBaseLocation);
int failedBatches = 0;
for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter )
{
for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) {
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter);
HttpURLConnection conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId);
if ( conn == null ) {
@ -280,7 +230,7 @@ public class FileUtils {
//logger.debug("The zip file has been saved: " + zipFileFullPath); // DEBUG!
FileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
fileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
String[] fileNames = new File(targetDirectory).list();
if ( (fileNames == null) || (fileNames.length <= 1 ) ) { // The directory might have only one file, the "zip-file".
@ -311,10 +261,12 @@ public class FileUtils {
// At this point, we know that this file is related with one or more IDs of the payloads AND it has a valid fileName.
// Let's try to upload the file to S3 and update the payloads of all related IDs, either in successful upload or not.
String s3Url = S3ObjectStoreMinIO.uploadToS3(fileName, fileFullPath);
if ( s3Url != null ) {
setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url); // It checks weather (s3Url != null) and acts accordingly.
try {
String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath);
setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url);
numUploadedFiles ++;
} catch (Exception e) {
logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e);
}
// Else, the record will have its file-data set to "null", in the end of this method.
}
@ -342,9 +294,7 @@ public class FileUtils {
}
}
private static HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId)
{
private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) {
baseUrl += batchNum + "/";
String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
logger.info("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
@ -366,9 +316,7 @@ public class FileUtils {
return conn;
}
private static String getErrorMessageFromResponseBody(HttpURLConnection conn)
{
private String getErrorMessageFromResponseBody(HttpURLConnection conn) {
StringBuilder errorMsgStrB = new StringBuilder(500);
try ( BufferedReader br = new BufferedReader(new InputStreamReader(conn.getErrorStream())) ) { // Try-with-resources
String inputLine;
@ -387,9 +335,7 @@ public class FileUtils {
}
}
private static List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch)
{
private List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch) {
int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch);
int endingIndex = (curBatch * numOfFullTextsPerBatch);
if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small.
@ -406,12 +352,9 @@ public class FileUtils {
return fileNamesOfCurBatch;
}
private String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch) {
final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
private static final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
// TODO - Make it THREAD-LOCAL, if we move to multi-thread batch requests.
private static String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch)
{
sb.append(baseUrl);
int numFullTextsCurBatch = fileNamesForCurBatch.size();
for ( int j=0; j < numFullTextsCurBatch; ++j ){
@ -419,15 +362,12 @@ public class FileUtils {
if ( j < (numFullTextsCurBatch -1) )
sb.append(",");
}
String requestUrl = sb.toString();
sb.setLength(0); // Reset for the next batch.
return requestUrl;
return sb.toString();
}
private final int bufferSize = 20971520; // 20 MB
private static final int bufferSize = 20971520; // 20 MB
public static boolean saveZipFile(HttpURLConnection conn, File zipFile)
{
public boolean saveZipFile(HttpURLConnection conn, File zipFile) {
InputStream inStream = null;
FileOutputStream outStream = null;
try {
@ -454,9 +394,7 @@ public class FileUtils {
}
}
private static boolean isFileNameProblematic(String fileName, HashMultimap<String, Payload> payloadsHashMultimap)
{
private boolean isFileNameProblematic(String fileName, HashMultimap<String, Payload> payloadsHashMultimap) {
// Get the ID of the file.
Matcher matcher = FILENAME_ID.matcher(fileName);
if ( !matcher.matches() ) {
@ -492,16 +430,13 @@ public class FileUtils {
return true;
}
/**
* This method updates the UrlReports to not point to any downloaded fullText files.
* This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails.
* Then, we don't want any "links" to locally stored files, which will be deleted.
* @param urlReports
* @return
*/
public static void updateUrlReportsToHaveNoFullTextFiles(List<UrlReport> urlReports)
{
public void updateUrlReportsToHaveNoFullTextFiles(List<UrlReport> urlReports) {
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload != null )
@ -509,22 +444,19 @@ public class FileUtils {
}
}
private static void replaceNotUploadedFileLocations(List<UrlReport> urlReports)
{
private void replaceNotUploadedFileLocations(List<UrlReport> urlReports) {
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload != null ) {
String fileLocation = payload.getLocation();
if ( (fileLocation != null) && (! fileLocation.startsWith(S3ObjectStoreMinIO.s3Protocol)) )
if ( (fileLocation != null) && (! s3ObjectStore.isLocationInStore(fileLocation)) )
setUnretrievedFullText(payload);
}
}
}
public static void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMultimap<String, Payload> payloadsHashMultimap, List<String> fileNames)
{
public void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMultimap<String, Payload> payloadsHashMultimap, List<String> fileNames) {
for ( String fileName : fileNames ) {
// Get the ID of the file.
Matcher matcher = FILENAME_ID.matcher(fileName);
@ -543,9 +475,7 @@ public class FileUtils {
}
}
public static void setUnretrievedFullText(Payload payload)
{
public void setUnretrievedFullText(Payload payload) {
// Mark the full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text will be kept.
payload.setLocation(null);
payload.setHash(null);
@ -553,15 +483,13 @@ public class FileUtils {
payload.setSize(null);
}
/**
* Set the fileLocation for all those IDs related to the File. The IDs may have one or more payloads.
* @param fileIDs
* @param payloadsHashMultimap
* @param s3Url
*/
public static void setFullTextForMultipleIDs(Set<String> fileIDs, HashMultimap<String, Payload> payloadsHashMultimap, String s3Url)
{
public void setFullTextForMultipleIDs(Set<String> fileIDs, HashMultimap<String, Payload> payloadsHashMultimap, String s3Url) {
for ( String id : fileIDs ) {
Set<Payload> payloads = payloadsHashMultimap.get(id);
if ( payloads.isEmpty() ) {
@ -575,8 +503,7 @@ public class FileUtils {
}
}
public static boolean deleteDirectory(File curBatchDir) {
public boolean deleteDirectory(File curBatchDir) {
try {
org.apache.commons.io.FileUtils.deleteDirectory(curBatchDir);
return true;
@ -585,136 +512,4 @@ public class FileUtils {
return false;
}
}
private static String getCutBatchExceptionMessage(String sqleMessage)
{
// The sqleMessage contains the actual message followed by the long batch. This makes the logs unreadable. So we should shorten the message before logging.
int maxEnding = 1500;
if ( sqleMessage.length() > maxEnding )
return (sqleMessage.substring(0, maxEnding) + "...");
else
return sqleMessage;
}
// This is currently not used, but it may be useful in a future scenario.
private static long getInputFileLinesNum()
{
long numOfLines = 0;
try {
numOfLines = Files.lines(Paths.get(inputFileFullPath)).count();
logger.debug("The numOfLines in the inputFile is " + numOfLines);
} catch (IOException e) {
logger.error("Could not retrieve the numOfLines. " + e);
return -1;
}
return numOfLines;
}
/**
* This method decodes a Json String and returns its members.
* @param jsonLine String
* @return HashMap<String,String>
*/
public static Task jsonDecoder(String jsonLine)
{
// Get ID and url and put them in the HashMap
String idStr = null;
String urlStr = null;
try {
JSONObject jObj = new JSONObject(jsonLine); // Construct a JSONObject from the retrieved jsonLine.
idStr = jObj.get("id").toString();
urlStr = jObj.get("url").toString();
} catch (JSONException je) {
logger.warn("JSONException caught when tried to parse and extract values from jsonLine: \t" + jsonLine, je);
return null;
}
if ( urlStr.isEmpty() ) {
if ( !idStr.isEmpty() ) // If we only have the id, then go and log it.
logger.warn("The url was not found for id: \"" + idStr + "\"");
return null;
}
return new Task(idStr, urlStr, null);
}
/**
* This method parses a Json file and extracts the urls, along with the IDs.
* @return HashMultimap<String, String>
*/
public static HashMultimap<String, String> getNextIdUrlPairBatchFromJson()
{
Task inputIdUrlTuple;
int expectedPathsPerID = 5;
int expectedIDsPerBatch = jsonBatchSize / expectedPathsPerID;
HashMultimap<String, String> idAndUrlMappedInput = HashMultimap.create(expectedIDsPerBatch, expectedPathsPerID);
int curBeginning = fileIndex.get();
while ( inputScanner.get().hasNextLine() && (fileIndex.get() < (curBeginning + jsonBatchSize)) )
{// While (!EOF) and inside the current url-batch, iterate through lines.
//logger.debug("fileIndex: " + FileUtils.fileIndex.get()); // DEBUG!
// Take each line, remove potential double quotes.
String retrievedLineStr = inputScanner.get().nextLine();
//logger.debug("Loaded from inputFile: " + retrievedLineStr); // DEBUG!
fileIndex.set(fileIndex.get() +1);
if ( retrievedLineStr.isEmpty() ) {
unretrievableInputLines.set(unretrievableInputLines.get() +1);
continue;
}
if ( (inputIdUrlTuple = jsonDecoder(retrievedLineStr)) == null ) { // Decode the jsonLine and take the two attributes.
logger.warn("A problematic inputLine found: \t" + retrievedLineStr);
unretrievableInputLines.set(unretrievableInputLines.get() +1);
continue;
}
if ( !idAndUrlMappedInput.put(inputIdUrlTuple.getId(), inputIdUrlTuple.getUrl()) ) { // We have a duplicate url in the input.. log it here as we cannot pass it through the HashMultimap. It's possible that this as well as the original might be/give a docUrl.
duplicateIdUrlEntries.set(duplicateIdUrlEntries.get() +1);
}
}
return idAndUrlMappedInput;
}
/**
* This method returns the number of (non-heading, non-empty) lines we have read from the inputFile.
* @return loadedUrls
*/
public static int getCurrentlyLoadedUrls() // In the end, it gives the total number of urls we have processed.
{
return FileUtils.fileIndex.get() - FileUtils.unretrievableInputLines.get();
}
/**
* This method checks if there is no more input-data and returns true in that case.
* Otherwise, it returns false, if there is more input-data to be loaded.
* A "RuntimeException" is thrown if no input-urls were retrieved in general.
* @param isEmptyOfData
* @param isFirstRun
* @return finished loading / not finished
* @throws RuntimeException
*/
public static boolean isFinishedLoading(boolean isEmptyOfData, boolean isFirstRun)
{
if ( isEmptyOfData ) {
if ( isFirstRun )
logger.error("Could not retrieve any urls from the inputFile!");
else
logger.debug("Done loading " + FileUtils.getCurrentlyLoadedUrls() + " urls from the inputFile.");
return true;
}
return false;
}
}
}

View File

@ -0,0 +1,145 @@
package eu.openaire.urls_controller.util;
import io.minio.*;
import io.minio.messages.Bucket;
import io.minio.messages.Item;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Component
public class S3ObjectStore {
private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
private String s3Protocol = "s3://";
@Value("${services.pdfaggregation.controller.s3.endpoint}")
private String endpoint = null; // This is useful to be "public", to test file-locations.
@Value("${services.pdfaggregation.controller.s3.accessKey}")
private String accessKey = null;
@Value("${services.pdfaggregation.controller.s3.secretKey}")
private String secretKey = null;
@Value("${services.pdfaggregation.controller.s3.region}")
private String region = null;
@Value("${services.pdfaggregation.controller.s3.bucketName}")
private String bucketName = null;
@Value("${services.pdfaggregation.controller.s3.shouldEmptyBucket}")
private boolean shouldEmptyBucket = false; // Set true only for testing!
@Value("${services.pdfaggregation.controller.s3.shouldShowAllS3Buckets}")
private boolean shouldShowAllS3Buckets = false;
private MinioClient minioClient;
@PostConstruct
public void init() throws Exception {
this.minioClient = MinioClient.builder().endpoint(endpoint).credentials(accessKey, secretKey).region(region).build();
boolean bucketExists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
// Keep this commented-out to avoid objects-deletion by accident. The code is open-sourced, so it's easy to enable this ability if we really want it (e.g. for testing).
if ( bucketExists && shouldEmptyBucket ) {
emptyBucket(bucketName, false);
//throw new RuntimeException("stop just for test!");
}
// Make the bucket, if not exist.
if ( !bucketExists ) {
logger.info("Bucket \"" + bucketName + "\" does not exist! Going to create it..");
minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());
} else
logger.debug("Bucket \"" + bucketName + "\" already exists.");
if ( shouldShowAllS3Buckets ) {
List<Bucket> buckets = null;
try {
buckets = minioClient.listBuckets();
logger.debug("The buckets in the S3 ObjectStore are:");
for ( Bucket bucket : buckets ) {
logger.debug(bucket.name());
}
} catch (Exception e) {
logger.warn("Could not listBuckets: " + e.getMessage());
}
}
}
private final Pattern EXTENSION_PATTERN = Pattern.compile("(\\.[^.]+)$");
/**
* @param fileObjKeyName = "**File object key name**";
* @param fileFullPath = "**Path of the file to upload**";
* @return the url of the uploaded file
*/
public String uploadToS3(String fileObjKeyName, String fileFullPath) throws Exception {
String contentType = null;
// Take the Matcher to retrieve the extension.
Matcher extensionMatcher = EXTENSION_PATTERN.matcher(fileFullPath);
if ( extensionMatcher.find() ) {
String extension = null;
if ( (extension = extensionMatcher.group(0)) == null )
contentType = "application/pdf";
else {
if ( extension.equals("pdf") )
contentType = "application/pdf";
/*else if ( *//* TODO - other-extension-match *//* )
contentType = "application/EXTENSION"; */
else
contentType = "application/pdf"; // Default.
}
} else {
logger.warn("The file with key \"" + fileObjKeyName + "\" does not have a file-extension! Setting the \"pdf\"-mimeType.");
contentType = "application/pdf";
}
minioClient.uploadObject(UploadObjectArgs.builder()
.bucket(bucketName)
.object(fileObjKeyName).filename(fileFullPath)
.contentType(contentType).build());
// TODO - What if the fileObjKeyName already exists? Right now it gets overwritten (unless we add versioning0, which is not currently supported by our S3ObjectStore).
// Each Worker handles some of these cases, but in case of id-urls splitting between different workers or re-attempting some temporarily faulty urls later,
// duplicate fileNames may appear and cause file-overwriting from the part of S3ObjectStore.
String s3Url = s3Protocol + bucketName + "/" + fileObjKeyName; // Be aware: This url works only if the access to the bucket is public.
//logger.debug("Uploaded file \"" + fileObjKeyName + "\". The s3Url is: " + s3Url);
return s3Url;
}
public void emptyBucket(String bucketName, boolean shouldDeleteBucket) throws Exception {
logger.warn("Going to " + (shouldDeleteBucket ? "delete" : "empty") + " bucket \"" + bucketName + "\"");
// First list the objects of the bucket.
Iterable<Result<Item>> results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build());
// Then, delete the objects.
for ( Result<Item> resultItem : results )
try {
deleteFile(resultItem.get().objectName(), bucketName);
} catch (Exception e) {
logger.warn("Could not remove " + resultItem.get().objectName());
}
if ( shouldDeleteBucket ) {
// Lastly, delete the empty bucket.
minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
}
logger.info("Bucket " + bucketName + " was " + (shouldDeleteBucket ? "deleted!" : "emptied!"));
}
public boolean isLocationInStore(String location) {
return location.startsWith(s3Protocol);
}
private void deleteFile(String fileObjKeyName, String bucketName) throws Exception {
minioClient.removeObject(RemoveObjectArgs.builder().bucket(bucketName).object(fileObjKeyName).build());
}
}

View File

@ -1,231 +0,0 @@
package eu.openaire.urls_controller.util;
import io.minio.*;
import io.minio.messages.Bucket;
import io.minio.messages.Item;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.List;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class S3ObjectStoreMinIO {
private static final Logger logger = LoggerFactory.getLogger(S3ObjectStoreMinIO.class);
public static String s3Protocol = "s3://";
private static String endpoint = null;
private static String accessKey = null;
private static String secretKey = null;
private static String region = null;
private static String bucketName = null;
private static MinioClient minioClient;
public static final boolean shouldEmptyBucket = false; // Set true only for testing!
public static final String credentialsFilePath = System.getProperty("user.dir") + File.separator + "S3_minIO_credentials.txt";
private static final boolean shouldShowAllS3Buckets = false;
/**
* This must be called before any other methods.
* */
public S3ObjectStoreMinIO()
{
// Take the credentials from the file.
Scanner myReader = null;
try {
File credentialsFile = new File(credentialsFilePath);
if ( !credentialsFile.exists() ) {
throw new RuntimeException("credentialsFile \"" + credentialsFilePath + "\" does not exists!");
}
myReader = new Scanner(credentialsFile);
if ( myReader.hasNextLine() ) {
String[] credentials = myReader.nextLine().split(",");
if ( credentials.length < 5 ) {
throw new RuntimeException("Not all credentials were retrieved from file \"" + credentialsFilePath + "\"!");
}
endpoint = credentials[0].trim();
accessKey = credentials[1].trim();
secretKey = credentials[2].trim();
region = credentials[3].trim();
bucketName = credentials[4].trim();
}
} catch (Exception e) {
String errorMsg = "An error prevented the retrieval of the minIO credentials from the file: " + credentialsFilePath + "\n" + e.getMessage();
logger.error(errorMsg, e);
System.err.println(errorMsg);
System.exit(53);
} finally {
if ( myReader != null )
myReader.close();
}
if ( (endpoint == null) || (accessKey == null) || (secretKey == null) || (region == null) || (bucketName == null) ) {
String errorMsg = "No \"endpoint\" or/and \"accessKey\" or/and \"secretKey\" or/and \"region\" or/and \"bucketName\" could be retrieved from the file: " + credentialsFilePath;
logger.error(errorMsg);
System.err.println(errorMsg);
System.exit(54);
}
// It's not safe, nor helpful to show the credentials in the logs.
minioClient = MinioClient.builder().endpoint(endpoint).credentials(accessKey, secretKey).region(region).build();
boolean bucketExists = false;
try {
bucketExists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
} catch (Exception e) {
String errorMsg = "There was a problem while checking if the bucket \"" + bucketName + "\" exists!\n" + e.getMessage();
logger.error(errorMsg);
System.err.println(errorMsg);
System.exit(55);
}
// Keep this commented-out to avoid objects-deletion by accident. The code is open-sourced, so it's easy to enable this ability if we really want it (e.g. for testing).
/* if ( bucketExists && shouldEmptyBucket ) {
emptyBucket(bucketName, false);
//throw new RuntimeException("stop just for test!");
}*/
// Make the bucket, if not exist.
try {
if ( !bucketExists ) {
logger.info("Bucket \"" + bucketName + "\" does not exist! Going to create it..");
minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());
} else
logger.debug("Bucket \"" + bucketName + "\" already exists.");
} catch (Exception e) {
String errorMsg = "Could not create the bucket \"" + bucketName + "\"!";
logger.error(errorMsg, e);
System.err.println(errorMsg);
System.exit(56);
}
if ( shouldShowAllS3Buckets ) {
List<Bucket> buckets = null;
try {
buckets = minioClient.listBuckets();
logger.debug("The buckets in the S3 ObjectStore are:");
for ( Bucket bucket : buckets ) {
logger.debug(bucket.name());
}
} catch (Exception e) {
logger.warn("Could not listBuckets: " + e.getMessage());
}
}
}
public static final Pattern EXTENSION_PATTERN = Pattern.compile("(\\.[^.]+)$");
/**
* @param fileObjKeyName = "**File object key name**";
* @param fileFullPath = "**Path of the file to upload**";
* @return
*/
public static String uploadToS3(String fileObjKeyName, String fileFullPath)
{
String contentType = null;
// Take the Matcher to retrieve the extension.
Matcher extensionMatcher = EXTENSION_PATTERN.matcher(fileFullPath);
if ( extensionMatcher.find() ) {
String extension = null;
if ( (extension = extensionMatcher.group(0)) == null )
contentType = "application/pdf";
else {
if ( extension.equals("pdf") )
contentType = "application/pdf";
/*else if ( *//* TODO - other-extension-match *//* )
contentType = "application/pdf"; */
else
contentType = "application/pdf";
}
} else {
logger.warn("The file with key \"" + fileObjKeyName + "\" does not have a file-extension! Setting the \"pdf\"-mimeType.");
contentType = "application/pdf";
}
ObjectWriteResponse response;
try {
response = minioClient.uploadObject(UploadObjectArgs.builder()
.bucket(bucketName)
.object(fileObjKeyName).filename(fileFullPath)
.contentType(contentType).build());
// TODO - What if the fileObjKeyName already exists?
// Right now it gets overwritten (unless we add versioning, which is irrelevant for different objects..)
} catch (Exception e) {
logger.error("Could not upload the file \"" + fileObjKeyName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e);
return null;
}
// The urls of OpenAIRE-S3 do not respond PUBLICLY (that's ok). But they also return a 403-Forbidden when requested from a machine with access to them.
// An example for such uls is: https://<DOMAIN>//<BUCKET>/doiboost____::3f2fb79f97627fd94c45e694d2a8aa30.pdf
// That type of url is usable only in the test S3-Object-Store.
// We prefer the following scheme: s3://<BUCKET>//doiboost____::3f2fb79f97627fd94c45e694d2a8aa30.pdf
String s3Url = s3Protocol + bucketName + "/" + fileObjKeyName;
//logger.debug("Uploaded file \"" + fileObjKeyName + "\". The s3Url is: " + s3Url);
return s3Url;
}
public static boolean emptyBucket(String bucketName, boolean shouldDeleteBucket)
{
logger.warn("Going to " + (shouldDeleteBucket ? "delete" : "empty") + " bucket \"" + bucketName + "\"");
// First list the objects of the bucket.
Iterable<Result<Item>> results;
try {
results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build());
} catch (Exception e) {
logger.error("Could not retrieve the list of objects of bucket \"" + bucketName + "\"!");
return false;
}
// Then, delete the objects.
for ( Result<Item> resultItem : results ) {
try {
if ( !deleteFile(resultItem.get().objectName(), bucketName) ) {
logger.error("Cannot proceed with bucket deletion, since only an empty bucket can be removed!");
return false;
}
} catch (Exception e) {
logger.error("Error getting the object from resultItem: " + resultItem.toString() + "\nThe bucket \"" + bucketName + "\" will not be able to be deleted! Exception message: " + e.getMessage());
return false;
}
}
if ( shouldDeleteBucket ) {
// Lastly, delete the empty bucket.
try {
minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
} catch (Exception e) {
logger.error("Could not delete the bucket \"" + bucketName + "\" from the S3 ObjectStore, exception: " + e.getMessage(), e);
return false;
}
}
logger.info("Bucket " + bucketName + " was " + (shouldDeleteBucket ? "deleted!" : "emptied!"));
return true;
}
public static boolean deleteFile(String fileObjKeyName, String bucketName)
{
try {
minioClient.removeObject(RemoveObjectArgs.builder().bucket(bucketName).object(fileObjKeyName).build());
} catch (Exception e) {
logger.error("Could not delete the file \"" + fileObjKeyName + "\" from the S3 ObjectStore, exception: " + e.getMessage(), e);
return false;
}
return true;
}
}

View File

@ -0,0 +1,144 @@
package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.models.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.configurationprocessor.json.JSONException;
import org.springframework.boot.configurationprocessor.json.JSONObject;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.io.InputStream;
import java.util.Scanner;
@Component
public class TestFileUtils {
private static final Logger logger = LoggerFactory.getLogger(TestFileUtils.class);
public Resource testResource = new ClassPathResource("testInputFiles/orderedList1000.json");
public ThreadLocal<Integer> duplicateIdUrlEntries;
public ThreadLocal<Scanner> inputScanner;
private final int jsonBatchSize = 3000;
private ThreadLocal<Integer> fileIndex;
private ThreadLocal<Integer> unretrievableInputLines;
private final String utf8Charset = "UTF-8";
public TestFileUtils() throws IOException {
InputStream inputStream = testResource.getInputStream();
if ( inputStream == null )
throw new RuntimeException("No resourceFile was found with name \"" + testResource.getFilename() + "\"!");
inputScanner = ThreadLocal.withInitial(() -> new Scanner(inputStream, utf8Charset));
fileIndex = ThreadLocal.withInitial(() -> 0);
unretrievableInputLines = ThreadLocal.withInitial(() -> 0);
duplicateIdUrlEntries = ThreadLocal.withInitial(() -> 0);
}
/**
* This method parses a Json file and extracts the urls, along with the IDs.
* @return HashMultimap<String, String>
*/
public HashMultimap<String, String> getNextIdUrlPairBatchFromJson() {
Task inputIdUrlTuple;
int expectedPathsPerID = 5;
int expectedIDsPerBatch = jsonBatchSize / expectedPathsPerID;
HashMultimap<String, String> idAndUrlMappedInput = HashMultimap.create(expectedIDsPerBatch, expectedPathsPerID);
int curBeginning = fileIndex.get();
while ( inputScanner.get().hasNextLine() && (fileIndex.get() < (curBeginning + jsonBatchSize)) )
{// While (!EOF) and inside the current url-batch, iterate through lines.
//logger.debug("fileIndex: " + FileUtils.fileIndex.get()); // DEBUG!
// Take each line, remove potential double quotes.
String retrievedLineStr = inputScanner.get().nextLine();
//logger.debug("Loaded from inputFile: " + retrievedLineStr); // DEBUG!
fileIndex.set(fileIndex.get() +1);
if ( retrievedLineStr.isEmpty() ) {
unretrievableInputLines.set(unretrievableInputLines.get() +1);
continue;
}
if ( (inputIdUrlTuple = jsonDecoder(retrievedLineStr)) == null ) { // Decode the jsonLine and take the two attributes.
logger.warn("A problematic inputLine found: \t" + retrievedLineStr);
unretrievableInputLines.set(unretrievableInputLines.get() +1);
continue;
}
if ( !idAndUrlMappedInput.put(inputIdUrlTuple.getId(), inputIdUrlTuple.getUrl()) ) { // We have a duplicate url in the input.. log it here as we cannot pass it through the HashMultimap. It's possible that this as well as the original might be/give a docUrl.
duplicateIdUrlEntries.set(duplicateIdUrlEntries.get() +1);
}
}
return idAndUrlMappedInput;
}
/**
* This method decodes a Json String and returns its members.
* @param jsonLine String
* @return HashMap<String,String>
*/
private Task jsonDecoder(String jsonLine) {
// Get ID and url and put them in the HashMap
String idStr = null;
String urlStr = null;
try {
JSONObject jObj = new JSONObject(jsonLine); // Construct a JSONObject from the retrieved jsonLine.
idStr = jObj.get("id").toString();
urlStr = jObj.get("url").toString();
} catch (JSONException je) {
logger.warn("JSONException caught when tried to parse and extract values from jsonLine: \t" + jsonLine, je);
return null;
}
if ( urlStr.isEmpty() ) {
if ( !idStr.isEmpty() ) // If we only have the id, then go and log it.
logger.warn("The url was not found for id: \"" + idStr + "\"");
return null;
}
return new Task(idStr, urlStr, null);
}
/**
* This method checks if there is no more input-data and returns true in that case.
* Otherwise, it returns false, if there is more input-data to be loaded.
* A "RuntimeException" is thrown if no input-urls were retrieved in general.
* @param isEmptyOfData
* @param isFirstRun
* @return finished loading / not finished
*/
public boolean isFinishedLoading(boolean isEmptyOfData, boolean isFirstRun) {
if ( isEmptyOfData ) {
if ( isFirstRun )
logger.error("Could not retrieve any urls from the inputFile!");
else
logger.debug("Done loading " + getCurrentlyLoadedUrls() + " urls from the inputFile.");
return true;
}
return false;
}
/**
* This method returns the number of (non-heading, non-empty) lines we have read from the inputFile.
* @return loadedUrls
*/
private int getCurrentlyLoadedUrls() { // In the end, it gives the total number of urls we have processed.
return fileIndex.get() - unretrievableInputLines.get();
}
}

View File

@ -13,73 +13,72 @@ import java.net.URL;
public class UriBuilder {
private static final Logger logger = LoggerFactory.getLogger(UriBuilder.class);
public static String baseUrl = null;
public UriBuilder(Environment environment)
{
baseUrl = "http";
String sslEnabled = environment.getProperty("server.ssl.enabled");
if (sslEnabled == null) { // It's expected to not exist if there is no SSL-configuration.
logger.warn("No property \"server.ssl.enabled\" was found in \"application.properties\". Continuing with plain HTTP..");
sslEnabled = "false";
}
baseUrl += sslEnabled.equals("true") ? "s" : "";
baseUrl += "://";
String hostName = getPublicIP();
if ( hostName == null )
hostName = InetAddress.getLoopbackAddress().getHostName(); // Non-null.
baseUrl += hostName;
String serverPort = environment.getProperty("server.port");
if (serverPort == null) { // This is unacceptable!
logger.error("No property \"server.port\" was found in \"application.properties\"!");
System.exit(-1); // Well, I guess the Spring Boot would not start in this case anyway.
}
baseUrl += ":" + serverPort;
String baseInternalPath = environment.getProperty("server.servlet.context-path");
if ( baseInternalPath != null ) {
if ( !baseInternalPath.startsWith("/") )
baseUrl += "/";
baseUrl += baseInternalPath;
if ( !baseInternalPath.endsWith("/") )
baseUrl += "/";
} else {
logger.warn("No property \"server.servlet.context-path\" was found in \"application.properties\"!"); // Yes it's expected.
baseUrl += "/";
}
logger.debug("ServerBaseURL: " + baseUrl);
}
private static String getPublicIP()
{
String publicIpAddress = "";
URL url_name;
try {
url_name = new URL("https://api.ipify.org/");
} catch (MalformedURLException mue) {
logger.warn(mue.getMessage());
return null;
}
try (BufferedReader bf = new BufferedReader(new InputStreamReader(url_name.openStream()))) {
publicIpAddress = bf.readLine().trim();
} catch (Exception e) {
logger.warn("Cannot get the publicIP address for this machine!", e);
return null;
}
return publicIpAddress;
}
public static String getBaseUrl() {
return baseUrl;
}
public static void setBaseUrl(String baseUrl) {
UriBuilder.baseUrl = baseUrl;
}
// private static final Logger logger = LoggerFactory.getLogger(UriBuilder.class);
//
// public static String baseUrl = null;
//
// public UriBuilder(Environment environment) {
// baseUrl = "http";
// String sslEnabled = environment.getProperty("server.ssl.enabled");
// if (sslEnabled == null) { // It's expected to not exist if there is no SSL-configuration.
// logger.warn("No property \"server.ssl.enabled\" was found in \"application.properties\". Continuing with plain HTTP..");
// sslEnabled = "false";
// }
// baseUrl += sslEnabled.equals("true") ? "s" : "";
// baseUrl += "://";
//
// String hostName = getPublicIP();
// if ( hostName == null )
// hostName = InetAddress.getLoopbackAddress().getHostName(); // Non-null.
//
// baseUrl += hostName;
// String serverPort = environment.getProperty("server.port");
// if (serverPort == null) { // This is unacceptable!
// logger.error("No property \"server.port\" was found in \"application.properties\"!");
// System.exit(-1); // Well, I guess the Spring Boot would not start in this case anyway.
// }
// baseUrl += ":" + serverPort;
//
// String baseInternalPath = environment.getProperty("server.servlet.context-path");
// if ( baseInternalPath != null ) {
// if ( !baseInternalPath.startsWith("/") )
// baseUrl += "/";
// baseUrl += baseInternalPath;
// if ( !baseInternalPath.endsWith("/") )
// baseUrl += "/";
// } else {
// logger.warn("No property \"server.servlet.context-path\" was found in \"application.properties\"!"); // Yes it's expected.
// baseUrl += "/";
// }
//
// logger.debug("ServerBaseURL: " + baseUrl);
// }
//
// private String getPublicIP()
// {
// String publicIpAddress = "";
// URL url_name;
// try {
// url_name = new URL("https://api.ipify.org/");
// } catch (MalformedURLException mue) {
// logger.warn(mue.getMessage());
// return null;
// }
// try (BufferedReader bf = new BufferedReader(new InputStreamReader(url_name.openStream()))) {
// publicIpAddress = bf.readLine().trim();
// } catch (Exception e) {
// logger.warn("Cannot get the publicIP address for this machine!", e);
// return null;
// }
// return publicIpAddress;
// }
//
// public static String getBaseUrl() {
// return baseUrl;
// }
//
// public static void setBaseUrl(String baseUrl) {
// UriBuilder.baseUrl = baseUrl;
// }
}

View File

@ -1,30 +1,28 @@
# HTTPS CONFIGURATION
#server.port = 8443
#server.ssl.enabled = true
#server.ssl.key-store = src/main/resources/keystore.p12
#server.ssl.key-store-type = PKCS12
#server.ssl.key-alias = tomcat
#server.ssl.key-store-password = urls_controller_project
#server.tomcat.remoteip.remote-ip-header = x-your-remote-ip-header
#server.tomcat.remoteip.protocol-header = x-your-protocol-header
#server.error.include-stacktrace=never
# HTTP CONFIGURATION
server.port = 1880
# Server api path
server.servlet.context-path=/api
#Service config
services.pdfaggregation.controller.db.oldDatabaseName = pdfaggregation_i
services.pdfaggregation.controller.db.databaseName = pdfAggregationDatabase
services.pdfaggregation.controller.baseTargetLocation = /tmp/
services.pdfaggregation.controller.maxAttemptsPerRecord = 3
services.pdfaggregation.controller.assignmentLimit = 10000
services.pdfaggregation.controller.s3.endpoint = xa
services.pdfaggregation.controller.s3.accessKey = xa
services.pdfaggregation.controller.s3.secretKey = xa
services.pdfaggregation.controller.s3.region = xa
services.pdfaggregation.controller.s3.bucketName = xa
services.pdfaggregation.controller.s3.shouldEmptyBucket = false
services.pdfaggregation.controller.s3.shouldShowAllS3Buckets = true
# Database
spring.impala.url = jdbc:impala://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/
# Note: The "UseNativeQuery" does not work with the PreparedStatements! Also, the url does not work without the ending "/"
# The username and the password do not matter, since this app is always run in an pre-authenticated machine.
spring.impala.oldDatabaseName = pdfaggregation_i
spring.impala.databaseName = pdfAggregationDatabase
spring.impala.driver-class-name = com.cloudera.impala.jdbc41.Driver
spring.datasource.url=jdbc:impala://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/
spring.datasource.username=
spring.datasource.password=
spring.datasource.driver-class-name=com.cloudera.impala.jdbc41.Driver
spring.datasource.hikari.pool-name=ControllerPool
spring.datasource.hikari.maximumPoolSize=20
@ -33,7 +31,6 @@ spring.datasource.hikari.minimumIdle=4
spring.datasource.hikari.connectionTimeout=30000
spring.datasource.hikari.idleTimeout=600000
# LOGGING LEVELS
logging.level.root=INFO
logging.level.org.springframework.web=INFO
@ -41,17 +38,13 @@ logging.level.org.springframework.security=WARN
logging.level.eu.openaire.urls_controller=DEBUG
spring.output.ansi.enabled=always
## MULTIPART (MultipartProperties)
# Enable multipart uploads
spring.servlet.multipart.enabled=true
# Threshold after which files are written to disk.
spring.servlet.multipart.file-size-threshold=2KB
# Max file size.
spring.servlet.multipart.max-file-size=200MB
# Max Request Size
spring.servlet.multipart.max-request-size=215MB
spring.servlet.multipart.max-request-size=215MB