forked from lsmyrnaios/UrlsController
finished merge
This commit is contained in:
commit
6dde8c0faa
|
@ -0,0 +1,6 @@
|
|||
.gradle/
|
||||
.idea/
|
||||
build/
|
||||
derby.log
|
||||
logs/
|
||||
src/main/main.iml
|
11
build.gradle
11
build.gradle
|
@ -48,10 +48,17 @@ dependencies {
|
|||
implementation 'io.minio:minio:8.3.5'
|
||||
|
||||
// https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp
|
||||
implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3' // This is required by the minio, as Spring uses a version which is not supported by minio.
|
||||
implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
|
||||
// This is required by the minio, as Spring uses a version which is not supported by minio.
|
||||
|
||||
// https://mvnrepository.com/artifact/com.cloudera.impala/jdbc
|
||||
implementation group: 'com.cloudera.impala', name: 'jdbc', version: '2.5.31'
|
||||
implementation("com.cloudera.impala:jdbc:2.5.31") {
|
||||
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
|
||||
exclude group: 'org.apache.derby', module: 'derby'
|
||||
exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all'
|
||||
exclude group: 'log4j', module: 'log4j'
|
||||
exclude group: 'log4j', module: 'apache-log4j-extras'
|
||||
}
|
||||
|
||||
testImplementation group: 'org.springframework.security', name: 'spring-security-test'
|
||||
testImplementation "org.springframework.boot:spring-boot-starter-test"
|
||||
|
|
|
@ -1,8 +1,5 @@
|
|||
package eu.openaire.urls_controller;
|
||||
|
||||
|
||||
import eu.openaire.urls_controller.configuration.ImpalaConnector;
|
||||
import eu.openaire.urls_controller.util.S3ObjectStoreMinIO;
|
||||
import eu.openaire.urls_controller.util.UriBuilder;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
|
@ -14,7 +11,6 @@ import org.springframework.web.cors.CorsConfiguration;
|
|||
import org.springframework.web.cors.CorsConfigurationSource;
|
||||
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
|
@ -23,7 +19,6 @@ import java.util.Collections;
|
|||
public class Application {
|
||||
|
||||
public static void main(String[] args) {
|
||||
new S3ObjectStoreMinIO();
|
||||
SpringApplication.run(Application.class, args);
|
||||
}
|
||||
|
||||
|
@ -38,20 +33,10 @@ public class Application {
|
|||
source.registerCorsConfiguration("/**", configuration);
|
||||
return source;
|
||||
}
|
||||
|
||||
|
||||
@PreDestroy
|
||||
public static void preDestroy()
|
||||
{
|
||||
if ( ImpalaConnector.hikariDataSource != null )
|
||||
ImpalaConnector.hikariDataSource.close();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public CommandLineRunner setServerBaseUrl(Environment environment)
|
||||
{
|
||||
return args -> new UriBuilder(environment);
|
||||
}
|
||||
//
|
||||
// @Bean
|
||||
// public CommandLineRunner setServerBaseUrl(Environment environment) {
|
||||
// return args -> new UriBuilder(environment);
|
||||
// }
|
||||
|
||||
}
|
|
@ -1,256 +1,79 @@
|
|||
package eu.openaire.urls_controller.configuration;
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.beans.PropertyVetoException;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.sql.*;
|
||||
import java.util.Properties;
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
||||
@Repository
|
||||
public final class ImpalaConnector {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ImpalaConnector.class);
|
||||
|
||||
public static String impalaDriver;
|
||||
public static String impalaConnectionUrl;
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Value("services.pdfaggregation.controller.db.oldDatabaseName:pdfaggregation_i")
|
||||
public static String oldDatabaseName;
|
||||
@Value("services.pdfaggregation.controller.db.databaseName:pdfAggregationDatabase")
|
||||
public static String databaseName;
|
||||
public static String poolName;
|
||||
public static int hikariMaxConnectionPoolSize;
|
||||
public static int hikariMinIdleConnections;
|
||||
public static int hikariConnectionTimeOut;
|
||||
public static int hikariIdleTimeOut;
|
||||
public static int hikariMaxLifetime;
|
||||
|
||||
public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database.
|
||||
|
||||
public static HikariDataSource hikariDataSource;
|
||||
|
||||
private static final ImpalaConnector singletonObject = new ImpalaConnector();
|
||||
|
||||
public static ImpalaConnector getInstance()
|
||||
{
|
||||
return singletonObject;
|
||||
}
|
||||
|
||||
|
||||
public ImpalaConnector()
|
||||
{
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
logger.info("Max available memory to the Controller: " + Runtime.getRuntime().maxMemory() + " bytes.");
|
||||
|
||||
try {
|
||||
String dbSettingsPropertyFile = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "application.properties";
|
||||
FileReader fReader = new FileReader(dbSettingsPropertyFile);
|
||||
Properties props = new Properties();
|
||||
props.load(fReader); // Load jdbc related properties.
|
||||
|
||||
// Get each property value.
|
||||
impalaDriver = props.getProperty("spring.impala.driver-class-name");
|
||||
if ( !"".equals(impalaDriver) ) { // If not "null" or empty.
|
||||
Class.forName(impalaDriver);
|
||||
impalaConnectionUrl = props.getProperty("spring.impala.url");
|
||||
oldDatabaseName = props.getProperty("spring.impala.oldDatabaseName");
|
||||
databaseName = props.getProperty("spring.impala.databaseName");
|
||||
poolName = props.getProperty("spring.datasource.hikari.pool-name");
|
||||
hikariMaxConnectionPoolSize = Integer.parseInt(props.getProperty("spring.datasource.hikari.maximumPoolSize"));
|
||||
hikariMaxLifetime = Integer.parseInt(props.getProperty("spring.datasource.hikari.maxLifetime"));
|
||||
hikariMinIdleConnections = Integer.parseInt(props.getProperty("spring.datasource.hikari.minimumIdle"));
|
||||
hikariConnectionTimeOut = Integer.parseInt(props.getProperty("spring.datasource.hikari.connectionTimeout"));
|
||||
hikariIdleTimeOut = Integer.parseInt(props.getProperty("spring.datasource.hikari.idleTimeout"));
|
||||
} else
|
||||
throw new RuntimeException("The \"impalaDriver\" was null or empty!");
|
||||
} catch(Exception e) {
|
||||
String errorMsg = "Error when loading the database properties!\n" + e.getMessage();
|
||||
logger.error(errorMsg, e);
|
||||
System.err.println(errorMsg);
|
||||
System.exit(11);
|
||||
}
|
||||
|
||||
try {
|
||||
hikariDataSource = impalaDS();
|
||||
} catch (SQLException | PropertyVetoException e) {
|
||||
logger.error("Problem when creating the Hikari connection pool!", e);
|
||||
if ( jdbcTemplate.getDataSource().getConnection().getMetaData().supportsBatchUpdates() )
|
||||
logger.warn("The database does not support \"BatchUpdates\"!");
|
||||
} catch (Exception e) {
|
||||
logger.error("Error testing if database supports batch updates", e);
|
||||
}
|
||||
|
||||
createDatabase();
|
||||
}
|
||||
|
||||
|
||||
public HikariDataSource impalaDS() throws SQLException, PropertyVetoException
|
||||
{
|
||||
HikariConfig hikariConfig = new HikariConfig();
|
||||
hikariConfig.setDriverClassName(ImpalaConnector.impalaDriver);
|
||||
hikariConfig.setAutoCommit(true);
|
||||
hikariConfig.setJdbcUrl(ImpalaConnector.impalaConnectionUrl);
|
||||
hikariConfig.setPoolName(poolName);
|
||||
hikariConfig.setMaximumPoolSize(hikariMaxConnectionPoolSize);
|
||||
hikariConfig.setMaxLifetime(hikariMaxLifetime);
|
||||
hikariConfig.setMinimumIdle(hikariMinIdleConnections);
|
||||
hikariConfig.setConnectionTimeout(hikariConnectionTimeOut);
|
||||
hikariConfig.setIdleTimeout(hikariIdleTimeOut);
|
||||
return new HikariDataSource(hikariConfig);
|
||||
}
|
||||
|
||||
|
||||
public void createDatabase()
|
||||
{
|
||||
Connection con = getConnection();
|
||||
if ( con == null )
|
||||
System.exit(22);
|
||||
|
||||
try {
|
||||
if ( !con.getMetaData().supportsBatchUpdates() )
|
||||
logger.warn("The database does not support \"BatchUpdates\"!");
|
||||
} catch (SQLException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
|
||||
private void createDatabase() {
|
||||
logger.info("Going to create the database and the tables, if they do not exist. Also will fill some tables with data from OpenAIRE.");
|
||||
Statement statement = null;
|
||||
try {
|
||||
statement = con.createStatement();
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("Problem when creating a connection-statement!\n" + sqle.getMessage());
|
||||
ImpalaConnector.closeConnection(con);
|
||||
System.exit(33);
|
||||
}
|
||||
|
||||
try {
|
||||
statement.execute("CREATE DATABASE IF NOT EXISTS " + databaseName);
|
||||
jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + databaseName);
|
||||
|
||||
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + oldDatabaseName + ".publication");
|
||||
statement.execute("COMPUTE STATS " + databaseName + ".publication");
|
||||
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + oldDatabaseName + ".publication");
|
||||
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication");
|
||||
|
||||
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + oldDatabaseName + ".publication_pids");
|
||||
statement.execute("COMPUTE STATS " + databaseName + ".publication_pids");
|
||||
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + oldDatabaseName + ".publication_pids");
|
||||
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_pids");
|
||||
|
||||
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + oldDatabaseName + ".publication_urls");
|
||||
statement.execute("COMPUTE STATS " + databaseName + ".publication_urls");
|
||||
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + oldDatabaseName + ".publication_urls");
|
||||
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_urls");
|
||||
|
||||
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + oldDatabaseName + ".datasource");
|
||||
statement.execute("COMPUTE STATS " + databaseName + ".datasource");
|
||||
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + oldDatabaseName + ".datasource");
|
||||
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".datasource");
|
||||
|
||||
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet");
|
||||
statement.execute("COMPUTE STATS " + databaseName + ".assignment");
|
||||
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet");
|
||||
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".assignment");
|
||||
|
||||
statement.execute("DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE");
|
||||
jdbcTemplate.execute("DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE");
|
||||
|
||||
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet");
|
||||
statement.execute("COMPUTE STATS " + databaseName + ".attempt");
|
||||
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet");
|
||||
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".attempt");
|
||||
|
||||
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
|
||||
statement.execute("COMPUTE STATS " + databaseName + ".payload");
|
||||
} catch (SQLException sqle) {
|
||||
String errorMsg = "Problem when executing the \"create database and create tables queries!\n" + sqle.getMessage() + "\nSQL state: " + sqle.getSQLState() + "\nError code: " + sqle.getErrorCode();
|
||||
logger.error(errorMsg, sqle);
|
||||
System.err.println(errorMsg);
|
||||
System.exit(44);
|
||||
} finally {
|
||||
try {
|
||||
statement.close();
|
||||
con.close();
|
||||
} catch (SQLException sqle2) {
|
||||
logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
|
||||
}
|
||||
}
|
||||
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
|
||||
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload");
|
||||
|
||||
logger.info("The database \"" + databaseName + "\" and its tables were created or validated.");
|
||||
}
|
||||
|
||||
|
||||
public Connection getConnection()
|
||||
{
|
||||
try {
|
||||
return hikariDataSource.getConnection();
|
||||
//return DriverManager.getConnection(impalaConnectionUrl, null, null); // This is for non pooled connections.
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("Problem when connecting with the Impala-database!\n" + sqle.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public boolean testDatabaseAccess()
|
||||
{
|
||||
logger.info("Going to test Impala access..");
|
||||
Connection con = getConnection();
|
||||
if ( con == null )
|
||||
return false;
|
||||
|
||||
ResultSet res = null;
|
||||
try {
|
||||
String tableName = "publication";
|
||||
|
||||
// show tables
|
||||
String sql = "show tables '" + tableName + "'";
|
||||
logger.debug("Running: " + sql);
|
||||
res = con.prepareStatement(sql).executeQuery();
|
||||
if ( res.next() ) {
|
||||
logger.debug(res.getString(1));
|
||||
}
|
||||
|
||||
// describe table
|
||||
sql = "describe " + tableName;
|
||||
logger.debug("Running: " + sql);
|
||||
res = con.prepareStatement(sql).executeQuery();
|
||||
while ( res.next() ) {
|
||||
logger.debug(res.getString(1) + "\t" + res.getString(2));
|
||||
}
|
||||
|
||||
// select * query
|
||||
sql = "select * from " + tableName + " limit 3;";
|
||||
logger.debug("Running: " + sql);
|
||||
res = con.prepareStatement(sql).executeQuery();
|
||||
while ( res.next() ) {
|
||||
logger.debug(res.getString(1));
|
||||
}
|
||||
|
||||
// Get Assignments, only for testing here.
|
||||
//UrlController urlController = new UrlController();
|
||||
//ResponseEntity<?> responseEntity = urlController.getUrls("worker_1", ControllerConstants.ASSIGNMENTS_LIMIT);
|
||||
//logger.debug(responseEntity.toString());
|
||||
|
||||
} catch (SQLException sqle) {
|
||||
logger.error(sqle.getMessage(), sqle);
|
||||
return false;
|
||||
} finally {
|
||||
try {
|
||||
if ( res != null )
|
||||
res.close();
|
||||
con.close();
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("Could not close the connection with the Impala-database.\n" + sqle);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
public static boolean closeConnection(Connection con) {
|
||||
try {
|
||||
if ( con != null )
|
||||
con.close(); // It may have already closed and that's fine.
|
||||
return true;
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static String handlePreparedStatementException(String queryName, String query, String preparedStatementName, PreparedStatement preparedStatement, Connection con, Exception e)
|
||||
{
|
||||
public static String handlePreparedStatementException(String queryName, String query, Exception e) {
|
||||
String errorMsg = "Problem when creating " + (( ! queryName.startsWith("get")) ? "and executing " : "") + "the prepared statement for \"" + queryName + "\"!\n";
|
||||
logger.error(errorMsg + "\n\n" + query + "\n\n" + e.getMessage(), e);
|
||||
closeConnection(con);
|
||||
logger.error(errorMsg + "\n\n" + query + "\n\n", e);
|
||||
return errorMsg;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,58 +1,41 @@
|
|||
package eu.openaire.urls_controller.controllers;
|
||||
|
||||
import eu.openaire.urls_controller.configuration.ImpalaConnector;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This controller will test the connectivity with the database and return statistics!
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/impala")
|
||||
public class ImpalaController {
|
||||
|
||||
// This controller will test the connectivity with the database and return statistics!
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ImpalaController.class);
|
||||
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
@GetMapping("get10PublicationIdsTest")
|
||||
public ResponseEntity<?> get10PublicationIdsTest() {
|
||||
|
||||
Connection con = ImpalaConnector.getInstance().getConnection();
|
||||
if ( con == null )
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
|
||||
|
||||
String query = "SELECT id FROM publication LIMIT 10;";
|
||||
|
||||
try ( ResultSet res = con.prepareStatement(query).executeQuery()) {
|
||||
if ( !res.first() ) {
|
||||
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\"!";
|
||||
logger.error(errorMsg);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
}
|
||||
|
||||
List<String> publications = new ArrayList<>();
|
||||
do {
|
||||
publications.add(res.getString(0));
|
||||
} while ( res.next() );
|
||||
|
||||
return new ResponseEntity<String>(publications.toString(), HttpStatus.OK);
|
||||
try {
|
||||
List<String> publications = jdbcTemplate.queryForList(query, String.class);
|
||||
|
||||
return new ResponseEntity<>(publications.toString(), HttpStatus.OK);
|
||||
} catch (Exception e) {
|
||||
String errorMsg = "Problem when executing \"getAssignmentsQuery\": " + query;
|
||||
logger.error(errorMsg, e);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
} finally {
|
||||
ImpalaConnector.closeConnection(con);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
package eu.openaire.urls_controller.controllers;
|
||||
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import eu.openaire.urls_controller.models.Assignment;
|
||||
import eu.openaire.urls_controller.models.Datasource;
|
||||
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
|
||||
import eu.openaire.urls_controller.util.GenericUtils;
|
||||
import eu.openaire.urls_controller.util.TestFileUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@RestController
|
||||
public class TestController extends GeneralController {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(TestController.class);
|
||||
|
||||
@Autowired
|
||||
private TestFileUtils fileUtils;
|
||||
|
||||
@Value("services.pdfaggregation.controller.assignmentLimit")
|
||||
private int assignmentLimit;
|
||||
|
||||
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
|
||||
|
||||
@GetMapping("test")
|
||||
public ResponseEntity<?> getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
|
||||
|
||||
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + this.assignmentLimit);
|
||||
|
||||
List<Assignment> assignments = new ArrayList<>();
|
||||
HashMultimap<String, String> loadedIdUrlPairs;
|
||||
boolean isFirstRun = true;
|
||||
boolean assignmentsLimitReached = false;
|
||||
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
|
||||
|
||||
// Start loading urls.
|
||||
while ( true ) {
|
||||
loadedIdUrlPairs = fileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile.
|
||||
|
||||
if ( fileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on.
|
||||
break;
|
||||
else
|
||||
isFirstRun = false;
|
||||
|
||||
Set<Map.Entry<String, String>> pairs = loadedIdUrlPairs.entries();
|
||||
|
||||
for ( Map.Entry<String,String> pair : pairs ) {
|
||||
if ( assignments.size() >= workerAssignmentsLimit ) {
|
||||
assignmentsLimitReached = true;
|
||||
break;
|
||||
}
|
||||
|
||||
int randomNum = GenericUtils.getRandomNumber(1, 5);
|
||||
assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, timestamp));
|
||||
}// end pairs-for-loop
|
||||
|
||||
if ( assignmentsLimitReached ) {
|
||||
logger.debug("Done loading urls from the inputFile as the assignmentsLimit (" + workerAssignmentsLimit + ") was reached.");
|
||||
break;
|
||||
}
|
||||
}// end loading-while-loop
|
||||
|
||||
Scanner scanner = fileUtils.inputScanner.get();
|
||||
if ( scanner != null ) // Check if the initial value is null.
|
||||
scanner.close();
|
||||
|
||||
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
|
||||
logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + fileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
|
||||
return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
|
||||
}
|
||||
}
|
|
@ -1,23 +1,25 @@
|
|||
package eu.openaire.urls_controller.controllers;
|
||||
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import eu.openaire.urls_controller.configuration.ImpalaConnector;
|
||||
import eu.openaire.urls_controller.models.Error;
|
||||
import eu.openaire.urls_controller.models.*;
|
||||
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
|
||||
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
|
||||
import eu.openaire.urls_controller.util.ControllerConstants;
|
||||
import eu.openaire.urls_controller.util.FileUtils;
|
||||
import eu.openaire.urls_controller.util.GenericUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.RowCallbackHandler;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import java.sql.*;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -27,11 +29,20 @@ public class UrlController {
|
|||
|
||||
private static final Logger logger = LoggerFactory.getLogger(UrlController.class);
|
||||
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Autowired
|
||||
private FileUtils fileUtils;
|
||||
|
||||
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
|
||||
|
||||
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
|
||||
private static int maxAttemptsPerRecord = ControllerConstants.MAX_ATTEMPTS_PER_RECORD;
|
||||
|
||||
@Value("services.pdfaggregation.controller.maxAttemptsPerRecord")
|
||||
private int maxAttemptsPerRecord;
|
||||
|
||||
@Value("services.pdfaggregation.controller.assignmentLimit")
|
||||
private int assignmentLimit;
|
||||
|
||||
@GetMapping("")
|
||||
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
|
||||
|
@ -43,7 +54,7 @@ public class UrlController {
|
|||
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
|
||||
}
|
||||
|
||||
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
|
||||
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit);
|
||||
|
||||
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
|
||||
|
||||
|
@ -53,9 +64,9 @@ public class UrlController {
|
|||
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
|
||||
logger.error(errorMsg);
|
||||
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
|
||||
} else if ( assignmentsLimit > ControllerConstants.ASSIGNMENTS_LIMIT ) {
|
||||
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + ControllerConstants.ASSIGNMENTS_LIMIT + "). Will use the Controller's limit.");
|
||||
assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT;
|
||||
} else if ( assignmentsLimit > assignmentLimit ) {
|
||||
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + assignmentLimit + "). Will use the Controller's limit.");
|
||||
assignmentsLimit = assignmentLimit;
|
||||
}
|
||||
|
||||
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
|
||||
|
@ -80,129 +91,74 @@ public class UrlController {
|
|||
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
|
||||
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
|
||||
|
||||
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
|
||||
List<Assignment> assignments = new ArrayList<>();
|
||||
|
||||
ImpalaConnector.databaseLock.lock();
|
||||
Connection con = ImpalaConnector.getInstance().getConnection();
|
||||
if ( con == null ) { // This is already logged in "getConnection()".
|
||||
|
||||
try {
|
||||
jdbcTemplate.execute(createAssignmentsQuery);
|
||||
} catch (Exception sqle) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
|
||||
}
|
||||
|
||||
// All transactions in Impala automatically commit at the end of the statement. Currently, Impala does not support multi-statement transactions.
|
||||
// https://impala.apache.org/docs/build/html/topics/impala_transactions.html
|
||||
// We cannot use "savePoints" along with "autoCommit = false" to roll back to a previous state among multiple statements.
|
||||
|
||||
PreparedStatement createCurrentAssignmentsPreparedStatement = null;
|
||||
try {
|
||||
createCurrentAssignmentsPreparedStatement = con.prepareStatement(createAssignmentsQuery);
|
||||
// We cannot set the "limits" and the MAX_ATTEMPTS_PER_RECORD as preparedStatements parameters, as we get a "java.sql.SQLException: [Simba][JDBC](11420) Error, parameter metadata not populated."
|
||||
createCurrentAssignmentsPreparedStatement.execute();
|
||||
} catch (SQLException sqle) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
String errorMsg = ImpalaConnector.handlePreparedStatementException("createAssignmentsQuery", createAssignmentsQuery, "createCurrentAssignmentsPreparedStatement", createCurrentAssignmentsPreparedStatement, con, sqle);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
} finally {
|
||||
try {
|
||||
if ( createCurrentAssignmentsPreparedStatement != null )
|
||||
createCurrentAssignmentsPreparedStatement.close();
|
||||
} catch (SQLException sqle2) {
|
||||
logger.error("Failed to close the \"createCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
|
||||
}
|
||||
}
|
||||
jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery);
|
||||
} catch (Exception sqle) {
|
||||
String errorMsg = dropCurrentAssignmentTable();
|
||||
|
||||
PreparedStatement computeCurrentAssignmentsStatsPreparedStatement = null;
|
||||
try {
|
||||
computeCurrentAssignmentsStatsPreparedStatement = con.prepareStatement(computeCurrentAssignmentsStatsQuery);
|
||||
computeCurrentAssignmentsStatsPreparedStatement.execute();
|
||||
} catch (SQLException sqle) {
|
||||
String errorMsg = dropCurrentAssignmentTable(con);
|
||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
} finally {
|
||||
try {
|
||||
if ( computeCurrentAssignmentsStatsPreparedStatement != null )
|
||||
computeCurrentAssignmentsStatsPreparedStatement.close();
|
||||
} catch (SQLException sqle2) {
|
||||
logger.error("Failed to close the \"computeCurrentAssignmentsStatsPreparedStatement\"!\n" + sqle2.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
PreparedStatement getAssignmentsPreparedStatement = null;
|
||||
try {
|
||||
getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery);
|
||||
} catch (SQLException sqle) {
|
||||
String errorMsg = dropCurrentAssignmentTable(con);
|
||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle);
|
||||
// The "getAssignmentsPreparedStatement" will always be null here, so we do not close it.
|
||||
errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, sqle);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
}
|
||||
|
||||
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
|
||||
|
||||
try ( ResultSet resultSet = getAssignmentsPreparedStatement.executeQuery() ) {
|
||||
// Unfortunately, we cannot use the following as the used version of the Impala-driver does not support it.
|
||||
/*if ( !resultSet.first() ) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
|
||||
logger.error(errorMsg);
|
||||
ImpalaConnector.closeConnection(con);
|
||||
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
|
||||
}*/
|
||||
|
||||
// The cursor is automatically before the first element in this configuration.
|
||||
while ( resultSet.next() ) { // Move the cursor forward.
|
||||
// If the resultsSet is empty, then the control will never get inside the loop.
|
||||
// The following few lines, cannot be outside the "while" loop, since the same object is added, despite that we update the inner-values.
|
||||
Assignment assignment = new Assignment();
|
||||
assignment.setWorkerId(workerId);
|
||||
assignment.setTimestamp(timestamp);
|
||||
Datasource datasource = new Datasource();
|
||||
try { // For each of the 4 columns returned. The indexing starts from 1
|
||||
assignment.setId(resultSet.getString(1));
|
||||
assignment.setOriginalUrl(resultSet.getString(2));
|
||||
datasource.setId(resultSet.getString(3));
|
||||
datasource.setName(resultSet.getString(4));
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("No value was able to be retrieved from one of the columns of row_" + resultSet.getRow(), sqle);
|
||||
continue; // This object is broken, move to the next row.
|
||||
try {
|
||||
jdbcTemplate.query(getAssignmentsQuery, new RowCallbackHandler() {
|
||||
@Override
|
||||
public void processRow(ResultSet rs) throws SQLException {
|
||||
Assignment assignment = new Assignment();
|
||||
assignment.setWorkerId(workerId);
|
||||
assignment.setTimestamp(timestamp);
|
||||
Datasource datasource = new Datasource();
|
||||
try { // For each of the 4 columns returned. The indexing starts from 1
|
||||
assignment.setId(rs.getString(1));
|
||||
assignment.setOriginalUrl(rs.getString(2));
|
||||
datasource.setId(rs.getString(3));
|
||||
datasource.setName(rs.getString(4));
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
|
||||
}
|
||||
assignment.setDatasource(datasource);
|
||||
assignments.add(assignment);
|
||||
}
|
||||
assignment.setDatasource(datasource);
|
||||
assignments.add(assignment);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
String errorMsg = dropCurrentAssignmentTable(con);
|
||||
String errorMsg = dropCurrentAssignmentTable();
|
||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
|
||||
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
|
||||
logger.error(errorMsg, e);
|
||||
ImpalaConnector.closeConnection(con);
|
||||
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
} finally {
|
||||
try {
|
||||
getAssignmentsPreparedStatement.close();
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("Failed to close the \"getAssignmentsPreparedStatement\"!\n" + sqle.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
int assignmentsSize = assignments.size();
|
||||
if ( assignmentsSize == 0 ) {
|
||||
String errorMsg = dropCurrentAssignmentTable(con);
|
||||
String errorMsg = dropCurrentAssignmentTable();
|
||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests.
|
||||
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecord + " for the next requests.";
|
||||
logger.error(errorMsg);
|
||||
ImpalaConnector.closeConnection(con);
|
||||
|
||||
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
|
||||
} else if ( assignmentsSize < assignmentsLimit ) {
|
||||
maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests.
|
||||
|
@ -218,48 +174,37 @@ public class UrlController {
|
|||
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
|
||||
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
|
||||
|
||||
PreparedStatement insertAssignmentsPreparedStatement = null;
|
||||
try {
|
||||
insertAssignmentsPreparedStatement = con.prepareStatement(insertAssignmentsQuery);
|
||||
insertAssignmentsPreparedStatement.execute();
|
||||
} catch (SQLException sqle) {
|
||||
String errorMsg = dropCurrentAssignmentTable(con);
|
||||
jdbcTemplate.execute(insertAssignmentsQuery);
|
||||
} catch (Exception sqle) {
|
||||
String errorMsg = dropCurrentAssignmentTable();
|
||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle);
|
||||
errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, sqle);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
} finally {
|
||||
try {
|
||||
if ( insertAssignmentsPreparedStatement != null )
|
||||
insertAssignmentsPreparedStatement.close();
|
||||
} catch (SQLException sqle2) {
|
||||
logger.error("Failed to close the \"insertAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
String errorMsg = dropCurrentAssignmentTable(con);
|
||||
String errorMsg = dropCurrentAssignmentTable();
|
||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
|
||||
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
|
||||
|
||||
String mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, "", null);
|
||||
String mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", "", null);
|
||||
if ( mergeErrorMsg != null ) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
ImpalaConnector.closeConnection(con);
|
||||
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
||||
}
|
||||
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
ImpalaConnector.closeConnection(con);
|
||||
|
||||
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
|
||||
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
|
||||
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
|
||||
}
|
||||
|
||||
|
||||
@PostMapping("addWorkerReport")
|
||||
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
|
||||
|
||||
|
@ -294,52 +239,22 @@ public class UrlController {
|
|||
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
|
||||
|
||||
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
|
||||
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = FileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
|
||||
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
|
||||
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
|
||||
}
|
||||
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
|
||||
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
|
||||
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
|
||||
FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
|
||||
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
|
||||
}
|
||||
|
||||
ImpalaConnector.databaseLock.lock();
|
||||
Connection con = ImpalaConnector.getInstance().getConnection();
|
||||
if ( con == null ) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
|
||||
}
|
||||
|
||||
// Store the workerReport into the database.
|
||||
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
|
||||
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
|
||||
|
||||
String tempInsertQueryName = null;
|
||||
PreparedStatement preparedInsertPayloadStatement = null, preparedInsertAttemptStatement = null;
|
||||
try {
|
||||
tempInsertQueryName = "insertIntoPayloadBaseQuery";
|
||||
preparedInsertPayloadStatement = con.prepareStatement(insertIntoPayloadBaseQuery);
|
||||
tempInsertQueryName = "insertIntoAttemptBaseQuery";
|
||||
preparedInsertAttemptStatement = con.prepareStatement(insertIntoAttemptBaseQuery);
|
||||
} catch (SQLException sqle) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n";
|
||||
logger.error(errorMsg + sqle.getMessage());
|
||||
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
}
|
||||
|
||||
try {
|
||||
con.setAutoCommit(false); // Avoid writing to disk for each insert. Write them all in the end.
|
||||
} catch (SQLException sqle) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!\n";
|
||||
logger.error(errorMsg + sqle.getMessage());
|
||||
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
}
|
||||
|
||||
String payloadErrorMsg = null;
|
||||
int failedCount = 0;
|
||||
|
||||
|
@ -349,32 +264,27 @@ public class UrlController {
|
|||
|
||||
for ( UrlReport urlReport : urlReports ) {
|
||||
Payload payload = urlReport.getPayload();
|
||||
|
||||
if ( payload == null ) {
|
||||
logger.error("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments);
|
||||
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments);
|
||||
payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!";
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
|
||||
preparedInsertPayloadStatement.setString(1, payload.getId());
|
||||
preparedInsertPayloadStatement.setString(2, payload.getOriginal_url());
|
||||
preparedInsertPayloadStatement.setString(3, payload.getActual_url());
|
||||
preparedInsertPayloadStatement.setTimestamp(4, payload.getTimestamp_acquired());
|
||||
preparedInsertPayloadStatement.setString(5, payload.getMime_type());
|
||||
Object[] args = new Object[] {
|
||||
payload.getId(), payload.getOriginal_url(), payload.getActual_url(), payload.getTimestamp_acquired(),
|
||||
payload.getMime_type(), payload.getSize() != null?String.valueOf(payload.getSize()):null, payload.getHash(),
|
||||
payload.getLocation(), payload.getProvenance()};
|
||||
int[] argTypes = new int[] {
|
||||
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
|
||||
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
|
||||
|
||||
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
|
||||
String sizeStr = null;
|
||||
Long size = payload.getSize();
|
||||
if ( size != null )
|
||||
sizeStr = String.valueOf(size);
|
||||
jdbcTemplate.update(insertIntoPayloadBaseQuery, args, argTypes);
|
||||
|
||||
preparedInsertPayloadStatement.setString(6, sizeStr);
|
||||
preparedInsertPayloadStatement.setString(7, payload.getHash());
|
||||
preparedInsertPayloadStatement.setString(8, payload.getLocation());
|
||||
preparedInsertPayloadStatement.setString(9, payload.getProvenance());
|
||||
preparedInsertPayloadStatement.executeUpdate();
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": " + sqle.getMessage() + "\n\n");
|
||||
} catch (Exception sqle) {
|
||||
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": ", sqle);
|
||||
}
|
||||
|
||||
Error error = urlReport.getError();
|
||||
|
@ -384,81 +294,58 @@ public class UrlController {
|
|||
}
|
||||
|
||||
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
|
||||
preparedInsertAttemptStatement.setString(1, payload.getId());
|
||||
preparedInsertAttemptStatement.setString(2, payload.getOriginal_url());
|
||||
preparedInsertAttemptStatement.setTimestamp(3, payload.getTimestamp_acquired());
|
||||
preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString());
|
||||
preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null" too.
|
||||
preparedInsertAttemptStatement.setString(6, error.getMessage());
|
||||
preparedInsertAttemptStatement.executeUpdate();
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage() + "\n\n");
|
||||
Object[] args = new Object[] {
|
||||
payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(),
|
||||
urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()};
|
||||
int[] argTypes = new int[] {
|
||||
Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
|
||||
Types.VARCHAR};
|
||||
|
||||
jdbcTemplate.update(insertIntoAttemptBaseQuery, args, argTypes);
|
||||
} catch (Exception sqle) {
|
||||
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": ", sqle.getMessage());
|
||||
}
|
||||
}//end for-loop
|
||||
|
||||
try {
|
||||
con.commit(); // Commit all the insert-queries to the database (write them to disk).
|
||||
} catch (SQLException sqle) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
ImpalaConnector.closeConnection(con);
|
||||
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!";
|
||||
logger.error(errorMsg + "\n" + sqle.getMessage());
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
} finally {
|
||||
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here, as we might move forward.
|
||||
}
|
||||
|
||||
if ( payloadErrorMsg != null )
|
||||
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables, although " + payloadErrorMsg + " Going to merge the parquet files for those tables.");
|
||||
else
|
||||
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");
|
||||
|
||||
String mergeErrorMsg = FileUtils.mergeParquetFiles("payload", con, "", null);
|
||||
String mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
|
||||
if ( mergeErrorMsg != null ) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
ImpalaConnector.closeConnection(con);
|
||||
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
||||
}
|
||||
|
||||
mergeErrorMsg = FileUtils.mergeParquetFiles("attempt", con, "", null);
|
||||
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
|
||||
if ( mergeErrorMsg != null ) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
ImpalaConnector.closeConnection(con);
|
||||
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
||||
}
|
||||
|
||||
// This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
|
||||
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
|
||||
// We do not need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
|
||||
mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, " WHERE workerid != ", curWorkerId);
|
||||
mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
|
||||
if ( mergeErrorMsg != null ) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
ImpalaConnector.closeConnection(con);
|
||||
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
||||
}
|
||||
|
||||
try {
|
||||
con.commit(); // Apply the merges permanently (write them to disk).
|
||||
con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool.
|
||||
} catch (SQLException sqle) {
|
||||
String errorMsg = "Problem when committing changes to the database!";
|
||||
logger.error(errorMsg + "\n" + sqle.getMessage());
|
||||
// The statements used in "mergeParquetFiles()" are already closed.
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
} finally {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
ImpalaConnector.closeConnection(con);
|
||||
}
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
|
||||
logger.debug("Finished merging the database tables.");
|
||||
return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg);
|
||||
}
|
||||
|
||||
|
||||
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
|
||||
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
|
||||
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException
|
||||
{
|
||||
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {
|
||||
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
|
||||
sb.append(baseInsertQuery);
|
||||
for ( int i=1; i <= dataSize; ++i ) {
|
||||
|
@ -484,98 +371,15 @@ public class UrlController {
|
|||
return preparedInsertStatement;
|
||||
}
|
||||
|
||||
|
||||
private boolean closeStatements(Statement statement1, Statement statement2, Connection con) {
|
||||
try {
|
||||
if ( statement1 != null )
|
||||
statement1.close();
|
||||
if ( statement2 != null )
|
||||
statement2.close();
|
||||
if ( con != null )
|
||||
con.close(); // It may have already closed and that's fine.
|
||||
return true;
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("Could not close the statements or the connection with the Impala-database.\n" + sqle.getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@GetMapping("test")
|
||||
public ResponseEntity<?> getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
|
||||
|
||||
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
|
||||
|
||||
try {
|
||||
new FileUtils(); // Find the input file.
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage());
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The resource file, for the requested assignments, was not found.");
|
||||
}
|
||||
|
||||
List<Assignment> assignments = new ArrayList<>();
|
||||
HashMultimap<String, String> loadedIdUrlPairs;
|
||||
boolean isFirstRun = true;
|
||||
boolean assignmentsLimitReached = false;
|
||||
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
|
||||
|
||||
// Start loading urls.
|
||||
while ( true ) {
|
||||
loadedIdUrlPairs = FileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile.
|
||||
|
||||
if ( FileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on.
|
||||
break;
|
||||
else
|
||||
isFirstRun = false;
|
||||
|
||||
Set<Map.Entry<String, String>> pairs = loadedIdUrlPairs.entries();
|
||||
|
||||
for ( Map.Entry<String,String> pair : pairs )
|
||||
{
|
||||
if ( assignments.size() >= workerAssignmentsLimit ) {
|
||||
assignmentsLimitReached = true;
|
||||
break;
|
||||
}
|
||||
|
||||
int randomNum = GenericUtils.getRandomNumber(1, 5);
|
||||
assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, timestamp));
|
||||
}// end pairs-for-loop
|
||||
|
||||
if ( assignmentsLimitReached ) {
|
||||
logger.debug("Done loading urls from the inputFile as the assignmentsLimit (" + workerAssignmentsLimit + ") was reached.");
|
||||
break;
|
||||
}
|
||||
}// end loading-while-loop
|
||||
|
||||
Scanner scanner = FileUtils.inputScanner.get();
|
||||
if ( scanner != null ) // Check if the initial value is null.
|
||||
scanner.close();
|
||||
|
||||
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
|
||||
logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
|
||||
return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
|
||||
}
|
||||
|
||||
|
||||
private String dropCurrentAssignmentTable(Connection con)
|
||||
{
|
||||
private String dropCurrentAssignmentTable() {
|
||||
String dropCurrentAssignmentsQuery = "DROP TABLE " + ImpalaConnector.databaseName + ".current_assignment PURGE";
|
||||
PreparedStatement dropCurrentAssignmentsPreparedStatement = null;
|
||||
|
||||
try {
|
||||
dropCurrentAssignmentsPreparedStatement = con.prepareStatement(dropCurrentAssignmentsQuery);
|
||||
dropCurrentAssignmentsPreparedStatement.execute();
|
||||
jdbcTemplate.execute(dropCurrentAssignmentsQuery);
|
||||
return null;
|
||||
} catch (SQLException sqle) {
|
||||
} catch (Exception sqle) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
return ImpalaConnector.handlePreparedStatementException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, "dropCurrentAssignmentsPreparedStatement", dropCurrentAssignmentsPreparedStatement, con, sqle);
|
||||
} finally {
|
||||
try {
|
||||
if ( dropCurrentAssignmentsPreparedStatement != null )
|
||||
dropCurrentAssignmentsPreparedStatement.close();
|
||||
} catch (SQLException sqle2) {
|
||||
logger.error("Failed to close the \"dropCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
|
||||
}
|
||||
return ImpalaConnector.handlePreparedStatementException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, sqle);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,10 +0,0 @@
|
|||
package eu.openaire.urls_controller.util;
|
||||
|
||||
|
||||
public interface ControllerConstants {
|
||||
|
||||
int ASSIGNMENTS_LIMIT = 100_000; // The upper assignments-limit the Controller can handle. If the worker's limit is above this one, then the controller's limit is used. Otherwise, the worker's limit will be applied.
|
||||
|
||||
int MAX_ATTEMPTS_PER_RECORD = 3; // The maximum times a record can be processed, if each of the previous times failed with a "couldRetry" Error-Class.
|
||||
|
||||
}
|
|
@ -2,6 +2,7 @@ package eu.openaire.urls_controller.util;
|
|||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
|
@ -12,20 +13,16 @@ import java.nio.file.StandardCopyOption;
|
|||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipInputStream;
|
||||
|
||||
|
||||
@Component
|
||||
public class FileUnZipper {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FileUnZipper.class);
|
||||
|
||||
|
||||
public static void unzipFolder(Path source, Path target) throws Exception
|
||||
{
|
||||
try ( ZipInputStream zis = new ZipInputStream(new FileInputStream(source.toFile())) )
|
||||
{
|
||||
public void unzipFolder(Path source, Path target) throws Exception {
|
||||
try ( ZipInputStream zis = new ZipInputStream(new FileInputStream(source.toFile())) ) {
|
||||
// Iterate over the files in zip and un-zip them.
|
||||
ZipEntry zipEntry = zis.getNextEntry();
|
||||
while ( zipEntry != null )
|
||||
{
|
||||
while ( zipEntry != null ) {
|
||||
Path targetPath = zipSlipProtect(zipEntry, target);
|
||||
|
||||
if ( zipEntry.getName().endsWith(File.separator) ) // If we have a directory.
|
||||
|
@ -44,10 +41,8 @@ public class FileUnZipper {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
// Protect from a Zip Slip attack: https://snyk.io/research/zip-slip-vulnerability
|
||||
public static Path zipSlipProtect(ZipEntry zipEntry, Path targetDir) throws IOException
|
||||
{
|
||||
public Path zipSlipProtect(ZipEntry zipEntry, Path targetDir) throws IOException {
|
||||
Path targetDirResolved = targetDir.resolve(zipEntry.getName());
|
||||
// Make sure normalized file still has targetDir as its prefix, else throw an exception.
|
||||
Path normalizePath = targetDirResolved.normalize();
|
||||
|
@ -56,5 +51,4 @@ public class FileUnZipper {
|
|||
}
|
||||
return normalizePath;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -3,12 +3,15 @@ package eu.openaire.urls_controller.util;
|
|||
import com.google.common.collect.HashMultimap;
|
||||
import eu.openaire.urls_controller.configuration.ImpalaConnector;
|
||||
import eu.openaire.urls_controller.models.Payload;
|
||||
import eu.openaire.urls_controller.models.Task;
|
||||
import eu.openaire.urls_controller.models.UrlReport;
|
||||
import org.codehaus.groovy.syntax.Types;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.configurationprocessor.json.JSONException;
|
||||
import org.springframework.boot.configurationprocessor.json.JSONObject;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.dao.DataAccessException;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import java.io.*;
|
||||
|
@ -17,51 +20,38 @@ import java.net.URL;
|
|||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.sql.*;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@Component
|
||||
public class FileUtils {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
|
||||
|
||||
public static ThreadLocal<Scanner> inputScanner = new ThreadLocal<Scanner>(); // Every Thread has its own variable.
|
||||
private static final ThreadLocal<Integer> fileIndex = new ThreadLocal<Integer>();
|
||||
private static final ThreadLocal<Integer> unretrievableInputLines = new ThreadLocal<Integer>();
|
||||
public static ThreadLocal<Integer> duplicateIdUrlEntries = new ThreadLocal<Integer>();
|
||||
public static final int jsonBatchSize = 3000;
|
||||
private static final String utf8Charset = "UTF-8";
|
||||
public static String inputFileFullPath;
|
||||
private static final String workingDir = System.getProperty("user.dir") + File.separator;
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Autowired
|
||||
private S3ObjectStore s3ObjectStore;
|
||||
|
||||
public FileUtils() throws RuntimeException
|
||||
{
|
||||
inputFileFullPath = workingDir + "src" + File.separator + "main" + File.separator + "resources";
|
||||
String resourceFileName = "testInputFiles" + File.separator + "orderedList1000.json";
|
||||
inputFileFullPath += File.separator + resourceFileName;
|
||||
InputStream inputStream = getClass().getClassLoader().getResourceAsStream(resourceFileName);
|
||||
if ( inputStream == null )
|
||||
throw new RuntimeException("No resourceFile was found with name \"" + resourceFileName + "\".");
|
||||
@Autowired
|
||||
private FileUnZipper fileUnZipper;
|
||||
|
||||
logger.debug("Going to retrieve the data from the inputResourceFile: " + resourceFileName);
|
||||
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError}
|
||||
|
||||
FileUtils.inputScanner.set(new Scanner(inputStream, utf8Charset));
|
||||
fileIndex.set(0); // Re-initialize the file-number-pointer.
|
||||
unretrievableInputLines.set(0);
|
||||
duplicateIdUrlEntries.set(0);
|
||||
public FileUtils() throws RuntimeException {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
|
||||
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
|
||||
* Renames the clone to the original's name.
|
||||
* Returns the errorMsg, if an error appears, otherwise is returns "null".
|
||||
* */
|
||||
public static String mergeParquetFiles(String tableName, Connection con, String whereClause, String parameter)
|
||||
{
|
||||
public String mergeParquetFiles(String tableName, String whereClause, String parameter) {
|
||||
String errorMsg;
|
||||
if ( tableName == null ) {
|
||||
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
|
||||
|
@ -78,42 +68,29 @@ public class FileUtils {
|
|||
else
|
||||
parameter = " '" + parameter + "'"; // This will be a "string-check".
|
||||
|
||||
Statement statement;
|
||||
try {
|
||||
statement = con.createStatement();
|
||||
} catch (SQLException sqle) {
|
||||
errorMsg = "Problem when creating a connection-statement!\n";
|
||||
logger.error(errorMsg + sqle.getMessage());
|
||||
return errorMsg;
|
||||
}
|
||||
|
||||
try {
|
||||
statement.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName + " " + whereClause + parameter);
|
||||
statement.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE");
|
||||
statement.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName);
|
||||
statement.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName);
|
||||
} catch (SQLException sqle) {
|
||||
jdbcTemplate.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName + " " + whereClause + parameter);
|
||||
jdbcTemplate.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE");
|
||||
jdbcTemplate.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName);
|
||||
jdbcTemplate.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName);
|
||||
} catch (DataAccessException e) {
|
||||
errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n";
|
||||
logger.error(errorMsg + getCutBatchExceptionMessage(sqle.getMessage()), sqle);
|
||||
logger.error(errorMsg, e);
|
||||
return errorMsg;
|
||||
} finally {
|
||||
// Make sure we close the statement.
|
||||
try { statement.close(); }
|
||||
catch (SQLException sqle3) { logger.error("Could not close the statement for executing queries in the Impala-database.\n" + sqle3); }
|
||||
}
|
||||
|
||||
return null; // No errorMsg, everything is fine.
|
||||
}
|
||||
|
||||
private final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$");
|
||||
private final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$");
|
||||
|
||||
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError};
|
||||
private static final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$");
|
||||
private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$");
|
||||
public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator;
|
||||
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
|
||||
@Value("services.pdfaggregation.controller.baseTargetLocation")
|
||||
private String baseTargetLocation;
|
||||
|
||||
public static UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId)
|
||||
{
|
||||
private final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
|
||||
|
||||
public UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId) {
|
||||
// The Controller have to request the files from the Worker, in order to upload them to the S3.
|
||||
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
|
||||
|
||||
|
@ -126,30 +103,15 @@ public class FileUtils {
|
|||
remoteAddr = request.getRemoteAddr();
|
||||
|
||||
ImpalaConnector.databaseLock.lock();
|
||||
Connection con = ImpalaConnector.getInstance().getConnection();
|
||||
if ( con == null ) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
logger.error("Problem when creating the Impala-connection!");
|
||||
return UploadFullTextsResponse.databaseError;
|
||||
}
|
||||
|
||||
String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ?" ;
|
||||
PreparedStatement getFileLocationForHashPreparedStatement = null;
|
||||
try {
|
||||
getFileLocationForHashPreparedStatement = con.prepareStatement(getFileLocationForHashQuery);
|
||||
} catch (SQLException sqle) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
logger.error("Problem when creating the prepared statement for \"" + getFileLocationForHashQuery + "\"!\n" + sqle.getMessage());
|
||||
return UploadFullTextsResponse.databaseError;
|
||||
}
|
||||
|
||||
// Get the file-locations.
|
||||
int numFullTextUrlsFound = 0;
|
||||
int numFilesFoundFromPreviousAssignmentsBatches = 0;
|
||||
HashMultimap<String, String> allFileNamesWithIDsHashMap = HashMultimap.create((urlReports.size() / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
|
||||
|
||||
for ( UrlReport urlReport : urlReports )
|
||||
{
|
||||
for ( UrlReport urlReport : urlReports ) {
|
||||
UrlReport.StatusType statusType = urlReport.getStatus();
|
||||
if ( (statusType == null) || statusType.equals(UrlReport.StatusType.non_accessible) ) {
|
||||
continue;
|
||||
|
@ -160,7 +122,7 @@ public class FileUtils {
|
|||
if ( payload == null )
|
||||
continue;
|
||||
|
||||
String fileLocation = null;
|
||||
String fileLocation;
|
||||
|
||||
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
|
||||
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
|
||||
|
@ -168,32 +130,24 @@ public class FileUtils {
|
|||
// Use the same prepared-statement for all requests, to improve speed (just like when inserting similar thing to the DB).
|
||||
String fileHash = payload.getHash();
|
||||
if ( fileHash != null ) {
|
||||
try {
|
||||
getFileLocationForHashPreparedStatement.setString(1, fileHash);
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("Error when setting the parameter in \"getFileLocationForHashQuery\"!\n" + sqle.getMessage());
|
||||
|
||||
fileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[] {fileHash}, new int[] {Types.STRING}, String.class);
|
||||
|
||||
if ( fileLocation != null ) { // If the full-text of this record is already-found and uploaded.
|
||||
payload.setLocation(fileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
|
||||
|
||||
//logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + fileLocation + "\"."); // DEBUG!
|
||||
numFilesFoundFromPreviousAssignmentsBatches ++;
|
||||
|
||||
continue; // Do not request the file from the worker, it's already uploaded. Move on.
|
||||
}
|
||||
|
||||
try ( ResultSet resultSet = getFileLocationForHashPreparedStatement.executeQuery() ) {
|
||||
if ( resultSet.next() ) { // Move the "cursor" to the first row. If there is any data, then take the first result (there should not be more, but we still want the first anyway).
|
||||
fileLocation = resultSet.getString(1);
|
||||
if ( fileLocation != null ) { // If the full-text of this record is already-found and uploaded.
|
||||
payload.setLocation(fileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
|
||||
//logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + fileLocation + "\"."); // DEBUG!
|
||||
numFilesFoundFromPreviousAssignmentsBatches ++;
|
||||
continue; // Do not request the file from the worker, it's already uploaded. Move on.
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n" + e.getMessage());
|
||||
|
||||
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
|
||||
// TODO - Since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
|
||||
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not auto-closed here)and the Database connection.
|
||||
}
|
||||
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
|
||||
// TODO - Since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
|
||||
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not auto-closed here)and the Database connection.
|
||||
}
|
||||
|
||||
// If the full-text of this record was not found by a previous batch..
|
||||
// If the full-text of this record was not found by a previous batch...
|
||||
fileLocation = payload.getLocation();
|
||||
if ( fileLocation != null ) { // If the docFile was downloaded (without an error)..
|
||||
Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation);
|
||||
|
@ -209,16 +163,7 @@ public class FileUtils {
|
|||
}
|
||||
}
|
||||
|
||||
// Close the Prepared Statement.
|
||||
try {
|
||||
if ( getFileLocationForHashPreparedStatement != null )
|
||||
getFileLocationForHashPreparedStatement.close();
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("Failed to close the \"getFileLocationForHashPreparedStatement\"!\n" + sqle.getMessage());
|
||||
} finally {
|
||||
ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database.
|
||||
ImpalaConnector.closeConnection(con);
|
||||
}
|
||||
ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database.
|
||||
|
||||
logger.info("NumFullTextUrlsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextUrlsFound + " (out of " + urlReports.size() + ").");
|
||||
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
|
||||
|
@ -252,8 +197,7 @@ public class FileUtils {
|
|||
File curAssignmentsBaseDir = new File(curAssignmentsBaseLocation);
|
||||
|
||||
int failedBatches = 0;
|
||||
for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter )
|
||||
{
|
||||
for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) {
|
||||
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter);
|
||||
HttpURLConnection conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId);
|
||||
if ( conn == null ) {
|
||||
|
@ -280,7 +224,7 @@ public class FileUtils {
|
|||
|
||||
//logger.debug("The zip file has been saved: " + zipFileFullPath); // DEBUG!
|
||||
|
||||
FileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
|
||||
fileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
|
||||
|
||||
String[] fileNames = new File(targetDirectory).list();
|
||||
if ( (fileNames == null) || (fileNames.length <= 1 ) ) { // The directory might have only one file, the "zip-file".
|
||||
|
@ -311,7 +255,7 @@ public class FileUtils {
|
|||
// At this point, we know that this file is related with one or more IDs of the payloads AND it has a valid fileName.
|
||||
// Let's try to upload the file to S3 and update the payloads of all related IDs, either in successful upload or not.
|
||||
|
||||
String s3Url = S3ObjectStoreMinIO.uploadToS3(fileName, fileFullPath);
|
||||
String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath);
|
||||
if ( s3Url != null ) {
|
||||
setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url); // It checks weather (s3Url != null) and acts accordingly.
|
||||
numUploadedFiles ++;
|
||||
|
@ -342,9 +286,7 @@ public class FileUtils {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private static HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId)
|
||||
{
|
||||
private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) {
|
||||
baseUrl += batchNum + "/";
|
||||
String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
|
||||
logger.info("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
|
||||
|
@ -366,9 +308,7 @@ public class FileUtils {
|
|||
return conn;
|
||||
}
|
||||
|
||||
|
||||
private static String getErrorMessageFromResponseBody(HttpURLConnection conn)
|
||||
{
|
||||
private String getErrorMessageFromResponseBody(HttpURLConnection conn) {
|
||||
StringBuilder errorMsgStrB = new StringBuilder(500);
|
||||
try ( BufferedReader br = new BufferedReader(new InputStreamReader(conn.getErrorStream())) ) { // Try-with-resources
|
||||
String inputLine;
|
||||
|
@ -387,9 +327,7 @@ public class FileUtils {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private static List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch)
|
||||
{
|
||||
private List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch) {
|
||||
int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch);
|
||||
int endingIndex = (curBatch * numOfFullTextsPerBatch);
|
||||
if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small.
|
||||
|
@ -406,12 +344,9 @@ public class FileUtils {
|
|||
return fileNamesOfCurBatch;
|
||||
}
|
||||
|
||||
private String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch) {
|
||||
final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
|
||||
|
||||
private static final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
|
||||
// TODO - Make it THREAD-LOCAL, if we move to multi-thread batch requests.
|
||||
|
||||
private static String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch)
|
||||
{
|
||||
sb.append(baseUrl);
|
||||
int numFullTextsCurBatch = fileNamesForCurBatch.size();
|
||||
for ( int j=0; j < numFullTextsCurBatch; ++j ){
|
||||
|
@ -420,14 +355,13 @@ public class FileUtils {
|
|||
sb.append(",");
|
||||
}
|
||||
String requestUrl = sb.toString();
|
||||
sb.setLength(0); // Reset for the next batch.
|
||||
|
||||
return requestUrl;
|
||||
}
|
||||
|
||||
private final int bufferSize = 20971520; // 20 MB
|
||||
|
||||
private static final int bufferSize = 20971520; // 20 MB
|
||||
public static boolean saveZipFile(HttpURLConnection conn, File zipFile)
|
||||
{
|
||||
public boolean saveZipFile(HttpURLConnection conn, File zipFile) {
|
||||
InputStream inStream = null;
|
||||
FileOutputStream outStream = null;
|
||||
try {
|
||||
|
@ -454,9 +388,7 @@ public class FileUtils {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private static boolean isFileNameProblematic(String fileName, HashMultimap<String, Payload> payloadsHashMultimap)
|
||||
{
|
||||
private boolean isFileNameProblematic(String fileName, HashMultimap<String, Payload> payloadsHashMultimap) {
|
||||
// Get the ID of the file.
|
||||
Matcher matcher = FILENAME_ID.matcher(fileName);
|
||||
if ( !matcher.matches() ) {
|
||||
|
@ -492,16 +424,13 @@ public class FileUtils {
|
|||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This method updates the UrlReports to not point to any downloaded fullText files.
|
||||
* This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails.
|
||||
* Then, we don't want any "links" to locally stored files, which will be deleted.
|
||||
* @param urlReports
|
||||
* @return
|
||||
*/
|
||||
public static void updateUrlReportsToHaveNoFullTextFiles(List<UrlReport> urlReports)
|
||||
{
|
||||
public void updateUrlReportsToHaveNoFullTextFiles(List<UrlReport> urlReports) {
|
||||
for ( UrlReport urlReport : urlReports ) {
|
||||
Payload payload = urlReport.getPayload();
|
||||
if ( payload != null )
|
||||
|
@ -509,22 +438,19 @@ public class FileUtils {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private static void replaceNotUploadedFileLocations(List<UrlReport> urlReports)
|
||||
{
|
||||
private void replaceNotUploadedFileLocations(List<UrlReport> urlReports) {
|
||||
for ( UrlReport urlReport : urlReports ) {
|
||||
Payload payload = urlReport.getPayload();
|
||||
if ( payload != null ) {
|
||||
String fileLocation = payload.getLocation();
|
||||
if ( (fileLocation != null) && (! fileLocation.startsWith(S3ObjectStoreMinIO.s3Protocol)) )
|
||||
|
||||
if ( (fileLocation != null) && (! s3ObjectStore.locationInStore(fileLocation)) )
|
||||
setUnretrievedFullText(payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMultimap<String, Payload> payloadsHashMultimap, List<String> fileNames)
|
||||
{
|
||||
public void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMultimap<String, Payload> payloadsHashMultimap, List<String> fileNames) {
|
||||
for ( String fileName : fileNames ) {
|
||||
// Get the ID of the file.
|
||||
Matcher matcher = FILENAME_ID.matcher(fileName);
|
||||
|
@ -543,9 +469,7 @@ public class FileUtils {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public static void setUnretrievedFullText(Payload payload)
|
||||
{
|
||||
public void setUnretrievedFullText(Payload payload) {
|
||||
// Mark the full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text will be kept.
|
||||
payload.setLocation(null);
|
||||
payload.setHash(null);
|
||||
|
@ -553,15 +477,13 @@ public class FileUtils {
|
|||
payload.setSize(null);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the fileLocation for all those IDs related to the File. The IDs may have one or more payloads.
|
||||
* @param fileIDs
|
||||
* @param payloadsHashMultimap
|
||||
* @param s3Url
|
||||
*/
|
||||
public static void setFullTextForMultipleIDs(Set<String> fileIDs, HashMultimap<String, Payload> payloadsHashMultimap, String s3Url)
|
||||
{
|
||||
public void setFullTextForMultipleIDs(Set<String> fileIDs, HashMultimap<String, Payload> payloadsHashMultimap, String s3Url) {
|
||||
for ( String id : fileIDs ) {
|
||||
Set<Payload> payloads = payloadsHashMultimap.get(id);
|
||||
if ( payloads.isEmpty() ) {
|
||||
|
@ -575,8 +497,7 @@ public class FileUtils {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public static boolean deleteDirectory(File curBatchDir) {
|
||||
public boolean deleteDirectory(File curBatchDir) {
|
||||
try {
|
||||
org.apache.commons.io.FileUtils.deleteDirectory(curBatchDir);
|
||||
return true;
|
||||
|
@ -585,136 +506,4 @@ public class FileUtils {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static String getCutBatchExceptionMessage(String sqleMessage)
|
||||
{
|
||||
// The sqleMessage contains the actual message followed by the long batch. This makes the logs unreadable. So we should shorten the message before logging.
|
||||
int maxEnding = 1500;
|
||||
if ( sqleMessage.length() > maxEnding )
|
||||
return (sqleMessage.substring(0, maxEnding) + "...");
|
||||
else
|
||||
return sqleMessage;
|
||||
}
|
||||
|
||||
|
||||
// This is currently not used, but it may be useful in a future scenario.
|
||||
private static long getInputFileLinesNum()
|
||||
{
|
||||
long numOfLines = 0;
|
||||
try {
|
||||
numOfLines = Files.lines(Paths.get(inputFileFullPath)).count();
|
||||
logger.debug("The numOfLines in the inputFile is " + numOfLines);
|
||||
} catch (IOException e) {
|
||||
logger.error("Could not retrieve the numOfLines. " + e);
|
||||
return -1;
|
||||
}
|
||||
return numOfLines;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This method decodes a Json String and returns its members.
|
||||
* @param jsonLine String
|
||||
* @return HashMap<String,String>
|
||||
*/
|
||||
public static Task jsonDecoder(String jsonLine)
|
||||
{
|
||||
// Get ID and url and put them in the HashMap
|
||||
String idStr = null;
|
||||
String urlStr = null;
|
||||
try {
|
||||
JSONObject jObj = new JSONObject(jsonLine); // Construct a JSONObject from the retrieved jsonLine.
|
||||
idStr = jObj.get("id").toString();
|
||||
urlStr = jObj.get("url").toString();
|
||||
} catch (JSONException je) {
|
||||
logger.warn("JSONException caught when tried to parse and extract values from jsonLine: \t" + jsonLine, je);
|
||||
return null;
|
||||
}
|
||||
|
||||
if ( urlStr.isEmpty() ) {
|
||||
if ( !idStr.isEmpty() ) // If we only have the id, then go and log it.
|
||||
logger.warn("The url was not found for id: \"" + idStr + "\"");
|
||||
return null;
|
||||
}
|
||||
return new Task(idStr, urlStr, null);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This method parses a Json file and extracts the urls, along with the IDs.
|
||||
* @return HashMultimap<String, String>
|
||||
*/
|
||||
public static HashMultimap<String, String> getNextIdUrlPairBatchFromJson()
|
||||
{
|
||||
Task inputIdUrlTuple;
|
||||
int expectedPathsPerID = 5;
|
||||
int expectedIDsPerBatch = jsonBatchSize / expectedPathsPerID;
|
||||
|
||||
HashMultimap<String, String> idAndUrlMappedInput = HashMultimap.create(expectedIDsPerBatch, expectedPathsPerID);
|
||||
|
||||
int curBeginning = fileIndex.get();
|
||||
|
||||
while ( inputScanner.get().hasNextLine() && (fileIndex.get() < (curBeginning + jsonBatchSize)) )
|
||||
{// While (!EOF) and inside the current url-batch, iterate through lines.
|
||||
|
||||
//logger.debug("fileIndex: " + FileUtils.fileIndex.get()); // DEBUG!
|
||||
|
||||
// Take each line, remove potential double quotes.
|
||||
String retrievedLineStr = inputScanner.get().nextLine();
|
||||
//logger.debug("Loaded from inputFile: " + retrievedLineStr); // DEBUG!
|
||||
|
||||
fileIndex.set(fileIndex.get() +1);
|
||||
|
||||
if ( retrievedLineStr.isEmpty() ) {
|
||||
unretrievableInputLines.set(unretrievableInputLines.get() +1);
|
||||
continue;
|
||||
}
|
||||
|
||||
if ( (inputIdUrlTuple = jsonDecoder(retrievedLineStr)) == null ) { // Decode the jsonLine and take the two attributes.
|
||||
logger.warn("A problematic inputLine found: \t" + retrievedLineStr);
|
||||
unretrievableInputLines.set(unretrievableInputLines.get() +1);
|
||||
continue;
|
||||
}
|
||||
|
||||
if ( !idAndUrlMappedInput.put(inputIdUrlTuple.getId(), inputIdUrlTuple.getUrl()) ) { // We have a duplicate url in the input.. log it here as we cannot pass it through the HashMultimap. It's possible that this as well as the original might be/give a docUrl.
|
||||
duplicateIdUrlEntries.set(duplicateIdUrlEntries.get() +1);
|
||||
}
|
||||
}
|
||||
|
||||
return idAndUrlMappedInput;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This method returns the number of (non-heading, non-empty) lines we have read from the inputFile.
|
||||
* @return loadedUrls
|
||||
*/
|
||||
public static int getCurrentlyLoadedUrls() // In the end, it gives the total number of urls we have processed.
|
||||
{
|
||||
return FileUtils.fileIndex.get() - FileUtils.unretrievableInputLines.get();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This method checks if there is no more input-data and returns true in that case.
|
||||
* Otherwise, it returns false, if there is more input-data to be loaded.
|
||||
* A "RuntimeException" is thrown if no input-urls were retrieved in general.
|
||||
* @param isEmptyOfData
|
||||
* @param isFirstRun
|
||||
* @return finished loading / not finished
|
||||
* @throws RuntimeException
|
||||
*/
|
||||
public static boolean isFinishedLoading(boolean isEmptyOfData, boolean isFirstRun)
|
||||
{
|
||||
if ( isEmptyOfData ) {
|
||||
if ( isFirstRun )
|
||||
logger.error("Could not retrieve any urls from the inputFile!");
|
||||
else
|
||||
logger.debug("Done loading " + FileUtils.getCurrentlyLoadedUrls() + " urls from the inputFile.");
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,142 @@
|
|||
package eu.openaire.urls_controller.util;
|
||||
|
||||
import io.minio.*;
|
||||
import io.minio.messages.Bucket;
|
||||
import io.minio.messages.Item;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.List;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@Component
|
||||
public class S3ObjectStore {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
|
||||
|
||||
private String s3Protocol = "s3://";
|
||||
@Value("services.pdfaggregation.controller.s3.endpoint")
|
||||
private String endpoint = null; // This is useful to be "public", to test file-locations.
|
||||
@Value("services.pdfaggregation.controller.s3.accessKey")
|
||||
private String accessKey = null;
|
||||
@Value("services.pdfaggregation.controller.s3.secretKey")
|
||||
private String secretKey = null;
|
||||
@Value("services.pdfaggregation.controller.s3.region")
|
||||
private String region = null;
|
||||
@Value("services.pdfaggregation.controller.s3.bucketName")
|
||||
private String bucketName = null;
|
||||
|
||||
@Value("services.pdfaggregation.controller.s3.shouldEmptyBucket")
|
||||
private boolean shouldEmptyBucket = false; // Set true only for testing!
|
||||
@Value("services.pdfaggregation.controller.s3.shouldShowAllS3Buckets")
|
||||
private boolean shouldShowAllS3Buckets = false;
|
||||
|
||||
private MinioClient minioClient;
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws Exception {
|
||||
this.minioClient = MinioClient.builder().endpoint(endpoint).credentials(accessKey, secretKey).region(region).build();
|
||||
|
||||
boolean bucketExists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
|
||||
|
||||
// Keep this commented-out to avoid objects-deletion by accident. The code is open-sourced, so it's easy to enable this ability if we really want it (e.g. for testing).
|
||||
if ( bucketExists && shouldEmptyBucket ) {
|
||||
emptyBucket(bucketName, false);
|
||||
//throw new RuntimeException("stop just for test!");
|
||||
}
|
||||
|
||||
// Make the bucket, if not exist.
|
||||
if ( !bucketExists ) {
|
||||
logger.info("Bucket \"" + bucketName + "\" does not exist! Going to create it..");
|
||||
minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());
|
||||
} else
|
||||
logger.debug("Bucket \"" + bucketName + "\" already exists.");
|
||||
|
||||
if ( shouldShowAllS3Buckets ) {
|
||||
List<Bucket> buckets = null;
|
||||
try {
|
||||
buckets = minioClient.listBuckets();
|
||||
logger.debug("The buckets in the S3 ObjectStore are:");
|
||||
for ( Bucket bucket : buckets ) {
|
||||
logger.debug(bucket.name());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("Could not listBuckets: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final Pattern EXTENSION_PATTERN = Pattern.compile("(\\.[^.]+)$");
|
||||
|
||||
/**
|
||||
* @param fileObjKeyName = "**File object key name**";
|
||||
* @param fileFullPath = "**Path of the file to upload**";
|
||||
* @return the url of the uploaded file
|
||||
*/
|
||||
public String uploadToS3(String fileObjKeyName, String fileFullPath) throws Exception {
|
||||
String contentType = null;
|
||||
|
||||
// Take the Matcher to retrieve the extension.
|
||||
Matcher extensionMatcher = EXTENSION_PATTERN.matcher(fileFullPath);
|
||||
if ( extensionMatcher.find() ) {
|
||||
String extension = null;
|
||||
if ( (extension = extensionMatcher.group(0)) == null )
|
||||
contentType = "application/pdf";
|
||||
else {
|
||||
if ( extension.equals("pdf") )
|
||||
contentType = "application/pdf";
|
||||
/*else if ( *//* TODO - other-extension-match *//* )
|
||||
contentType = "application/pdf"; */
|
||||
else
|
||||
contentType = "application/pdf";
|
||||
}
|
||||
} else {
|
||||
logger.warn("The file with key \"" + fileObjKeyName + "\" does not have a file-extension! Setting the \"pdf\"-mimeType.");
|
||||
contentType = "application/pdf";
|
||||
}
|
||||
|
||||
minioClient.uploadObject(UploadObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(fileObjKeyName).filename(fileFullPath)
|
||||
.contentType(contentType).build());
|
||||
|
||||
// TODO - What if the fileObjKeyName already exists?
|
||||
// Right now it gets overwritten (unless we add versioning, which is irrelevant for different objects..)
|
||||
|
||||
String s3Url = s3Protocol + bucketName + "/" + fileObjKeyName; // Be aware: This url works only if the access to the bucket is public.
|
||||
//logger.debug("Uploaded file \"" + fileObjKeyName + "\". The s3Url is: " + s3Url);
|
||||
return s3Url;
|
||||
}
|
||||
|
||||
public void emptyBucket(String bucketName, boolean shouldDeleteBucket) throws Exception {
|
||||
logger.warn("Going to " + (shouldDeleteBucket ? "delete" : "empty") + " bucket \"" + bucketName + "\"");
|
||||
|
||||
// First list the objects of the bucket.
|
||||
Iterable<Result<Item>> results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build());
|
||||
|
||||
// Then, delete the objects.
|
||||
for ( Result<Item> resultItem : results )
|
||||
try {
|
||||
deleteFile(resultItem.get().objectName(), bucketName);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Could not remove " + resultItem.get().objectName());
|
||||
}
|
||||
|
||||
if ( shouldDeleteBucket ) {
|
||||
// Lastly, delete the empty bucket.
|
||||
minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean locationInStore(String location) {
|
||||
return location.startsWith(endpoint);
|
||||
}
|
||||
|
||||
private void deleteFile(String fileObjKeyName, String bucketName) throws Exception {
|
||||
minioClient.removeObject(RemoveObjectArgs.builder().bucket(bucketName).object(fileObjKeyName).build());
|
||||
}
|
||||
}
|
|
@ -1,231 +0,0 @@
|
|||
package eu.openaire.urls_controller.util;
|
||||
|
||||
import io.minio.*;
|
||||
import io.minio.messages.Bucket;
|
||||
import io.minio.messages.Item;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
import java.util.Scanner;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
||||
public class S3ObjectStoreMinIO {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(S3ObjectStoreMinIO.class);
|
||||
|
||||
public static String s3Protocol = "s3://";
|
||||
private static String endpoint = null;
|
||||
private static String accessKey = null;
|
||||
private static String secretKey = null;
|
||||
private static String region = null;
|
||||
private static String bucketName = null;
|
||||
|
||||
private static MinioClient minioClient;
|
||||
|
||||
public static final boolean shouldEmptyBucket = false; // Set true only for testing!
|
||||
public static final String credentialsFilePath = System.getProperty("user.dir") + File.separator + "S3_minIO_credentials.txt";
|
||||
private static final boolean shouldShowAllS3Buckets = false;
|
||||
|
||||
|
||||
/**
|
||||
* This must be called before any other methods.
|
||||
* */
|
||||
public S3ObjectStoreMinIO()
|
||||
{
|
||||
// Take the credentials from the file.
|
||||
Scanner myReader = null;
|
||||
try {
|
||||
File credentialsFile = new File(credentialsFilePath);
|
||||
if ( !credentialsFile.exists() ) {
|
||||
throw new RuntimeException("credentialsFile \"" + credentialsFilePath + "\" does not exists!");
|
||||
}
|
||||
myReader = new Scanner(credentialsFile);
|
||||
if ( myReader.hasNextLine() ) {
|
||||
String[] credentials = myReader.nextLine().split(",");
|
||||
if ( credentials.length < 5 ) {
|
||||
throw new RuntimeException("Not all credentials were retrieved from file \"" + credentialsFilePath + "\"!");
|
||||
}
|
||||
endpoint = credentials[0].trim();
|
||||
accessKey = credentials[1].trim();
|
||||
secretKey = credentials[2].trim();
|
||||
region = credentials[3].trim();
|
||||
bucketName = credentials[4].trim();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
String errorMsg = "An error prevented the retrieval of the minIO credentials from the file: " + credentialsFilePath + "\n" + e.getMessage();
|
||||
logger.error(errorMsg, e);
|
||||
System.err.println(errorMsg);
|
||||
System.exit(53);
|
||||
} finally {
|
||||
if ( myReader != null )
|
||||
myReader.close();
|
||||
}
|
||||
|
||||
if ( (endpoint == null) || (accessKey == null) || (secretKey == null) || (region == null) || (bucketName == null) ) {
|
||||
String errorMsg = "No \"endpoint\" or/and \"accessKey\" or/and \"secretKey\" or/and \"region\" or/and \"bucketName\" could be retrieved from the file: " + credentialsFilePath;
|
||||
logger.error(errorMsg);
|
||||
System.err.println(errorMsg);
|
||||
System.exit(54);
|
||||
}
|
||||
// It's not safe, nor helpful to show the credentials in the logs.
|
||||
|
||||
minioClient = MinioClient.builder().endpoint(endpoint).credentials(accessKey, secretKey).region(region).build();
|
||||
|
||||
boolean bucketExists = false;
|
||||
try {
|
||||
bucketExists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
|
||||
} catch (Exception e) {
|
||||
String errorMsg = "There was a problem while checking if the bucket \"" + bucketName + "\" exists!\n" + e.getMessage();
|
||||
logger.error(errorMsg);
|
||||
System.err.println(errorMsg);
|
||||
System.exit(55);
|
||||
}
|
||||
|
||||
// Keep this commented-out to avoid objects-deletion by accident. The code is open-sourced, so it's easy to enable this ability if we really want it (e.g. for testing).
|
||||
/* if ( bucketExists && shouldEmptyBucket ) {
|
||||
emptyBucket(bucketName, false);
|
||||
//throw new RuntimeException("stop just for test!");
|
||||
}*/
|
||||
|
||||
// Make the bucket, if not exist.
|
||||
try {
|
||||
if ( !bucketExists ) {
|
||||
logger.info("Bucket \"" + bucketName + "\" does not exist! Going to create it..");
|
||||
minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());
|
||||
} else
|
||||
logger.debug("Bucket \"" + bucketName + "\" already exists.");
|
||||
} catch (Exception e) {
|
||||
String errorMsg = "Could not create the bucket \"" + bucketName + "\"!";
|
||||
logger.error(errorMsg, e);
|
||||
System.err.println(errorMsg);
|
||||
System.exit(56);
|
||||
}
|
||||
|
||||
if ( shouldShowAllS3Buckets ) {
|
||||
List<Bucket> buckets = null;
|
||||
try {
|
||||
buckets = minioClient.listBuckets();
|
||||
logger.debug("The buckets in the S3 ObjectStore are:");
|
||||
for ( Bucket bucket : buckets ) {
|
||||
logger.debug(bucket.name());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("Could not listBuckets: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static final Pattern EXTENSION_PATTERN = Pattern.compile("(\\.[^.]+)$");
|
||||
|
||||
/**
|
||||
* @param fileObjKeyName = "**File object key name**";
|
||||
* @param fileFullPath = "**Path of the file to upload**";
|
||||
* @return
|
||||
*/
|
||||
public static String uploadToS3(String fileObjKeyName, String fileFullPath)
|
||||
{
|
||||
String contentType = null;
|
||||
|
||||
// Take the Matcher to retrieve the extension.
|
||||
Matcher extensionMatcher = EXTENSION_PATTERN.matcher(fileFullPath);
|
||||
if ( extensionMatcher.find() ) {
|
||||
String extension = null;
|
||||
if ( (extension = extensionMatcher.group(0)) == null )
|
||||
contentType = "application/pdf";
|
||||
else {
|
||||
if ( extension.equals("pdf") )
|
||||
contentType = "application/pdf";
|
||||
/*else if ( *//* TODO - other-extension-match *//* )
|
||||
contentType = "application/pdf"; */
|
||||
else
|
||||
contentType = "application/pdf";
|
||||
}
|
||||
} else {
|
||||
logger.warn("The file with key \"" + fileObjKeyName + "\" does not have a file-extension! Setting the \"pdf\"-mimeType.");
|
||||
contentType = "application/pdf";
|
||||
}
|
||||
|
||||
ObjectWriteResponse response;
|
||||
try {
|
||||
response = minioClient.uploadObject(UploadObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(fileObjKeyName).filename(fileFullPath)
|
||||
.contentType(contentType).build());
|
||||
|
||||
// TODO - What if the fileObjKeyName already exists?
|
||||
// Right now it gets overwritten (unless we add versioning, which is irrelevant for different objects..)
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("Could not upload the file \"" + fileObjKeyName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e);
|
||||
return null;
|
||||
}
|
||||
|
||||
// The urls of OpenAIRE-S3 do not respond PUBLICLY (that's ok). But they also return a 403-Forbidden when requested from a machine with access to them.
|
||||
// An example for such uls is: https://<DOMAIN>//<BUCKET>/doiboost____::3f2fb79f97627fd94c45e694d2a8aa30.pdf
|
||||
// That type of url is usable only in the test S3-Object-Store.
|
||||
// We prefer the following scheme: s3://<BUCKET>//doiboost____::3f2fb79f97627fd94c45e694d2a8aa30.pdf
|
||||
|
||||
String s3Url = s3Protocol + bucketName + "/" + fileObjKeyName;
|
||||
//logger.debug("Uploaded file \"" + fileObjKeyName + "\". The s3Url is: " + s3Url);
|
||||
return s3Url;
|
||||
}
|
||||
|
||||
|
||||
public static boolean emptyBucket(String bucketName, boolean shouldDeleteBucket)
|
||||
{
|
||||
logger.warn("Going to " + (shouldDeleteBucket ? "delete" : "empty") + " bucket \"" + bucketName + "\"");
|
||||
|
||||
// First list the objects of the bucket.
|
||||
Iterable<Result<Item>> results;
|
||||
try {
|
||||
results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build());
|
||||
} catch (Exception e) {
|
||||
logger.error("Could not retrieve the list of objects of bucket \"" + bucketName + "\"!");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Then, delete the objects.
|
||||
for ( Result<Item> resultItem : results ) {
|
||||
try {
|
||||
if ( !deleteFile(resultItem.get().objectName(), bucketName) ) {
|
||||
logger.error("Cannot proceed with bucket deletion, since only an empty bucket can be removed!");
|
||||
return false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Error getting the object from resultItem: " + resultItem.toString() + "\nThe bucket \"" + bucketName + "\" will not be able to be deleted! Exception message: " + e.getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if ( shouldDeleteBucket ) {
|
||||
// Lastly, delete the empty bucket.
|
||||
try {
|
||||
minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
|
||||
} catch (Exception e) {
|
||||
logger.error("Could not delete the bucket \"" + bucketName + "\" from the S3 ObjectStore, exception: " + e.getMessage(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("Bucket " + bucketName + " was " + (shouldDeleteBucket ? "deleted!" : "emptied!"));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
public static boolean deleteFile(String fileObjKeyName, String bucketName)
|
||||
{
|
||||
try {
|
||||
minioClient.removeObject(RemoveObjectArgs.builder().bucket(bucketName).object(fileObjKeyName).build());
|
||||
} catch (Exception e) {
|
||||
logger.error("Could not delete the file \"" + fileObjKeyName + "\" from the S3 ObjectStore, exception: " + e.getMessage(), e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,147 @@
|
|||
package eu.openaire.urls_controller.util;
|
||||
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import eu.openaire.urls_controller.models.Task;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.configurationprocessor.json.JSONException;
|
||||
import org.springframework.boot.configurationprocessor.json.JSONObject;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Scanner;
|
||||
|
||||
@Component
|
||||
public class TestFileUtils {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(TestFileUtils.class);
|
||||
|
||||
@Value("classpath:testInputFiles/orderedList1000.json")
|
||||
Resource testResource;
|
||||
|
||||
public ThreadLocal<Integer> duplicateIdUrlEntries = new ThreadLocal<>();
|
||||
public ThreadLocal<Scanner> inputScanner = new ThreadLocal<>(); // Every Thread has its own variable.
|
||||
|
||||
private final int jsonBatchSize = 3000;
|
||||
private final ThreadLocal<Integer> fileIndex = new ThreadLocal<>();
|
||||
private final ThreadLocal<Integer> unretrievableInputLines = new ThreadLocal<>();
|
||||
|
||||
private final String utf8Charset = "UTF-8";
|
||||
|
||||
public TestFileUtils() throws IOException {
|
||||
String resourceFileName = "testInputFiles/orderedList1000.json";
|
||||
|
||||
InputStream inputStream = testResource.getInputStream();
|
||||
if ( inputStream == null )
|
||||
throw new RuntimeException("No resourceFile was found with name \"" + resourceFileName + "\".");
|
||||
|
||||
logger.debug("Going to retrieve the data from the inputResourceFile: " + resourceFileName);
|
||||
|
||||
inputScanner.set(new Scanner(inputStream, utf8Charset));
|
||||
|
||||
fileIndex.set(0);
|
||||
unretrievableInputLines.set(0);
|
||||
duplicateIdUrlEntries.set(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method parses a Json file and extracts the urls, along with the IDs.
|
||||
* @return HashMultimap<String, String>
|
||||
*/
|
||||
public HashMultimap<String, String> getNextIdUrlPairBatchFromJson() {
|
||||
Task inputIdUrlTuple;
|
||||
int expectedPathsPerID = 5;
|
||||
int expectedIDsPerBatch = jsonBatchSize / expectedPathsPerID;
|
||||
|
||||
HashMultimap<String, String> idAndUrlMappedInput = HashMultimap.create(expectedIDsPerBatch, expectedPathsPerID);
|
||||
|
||||
int curBeginning = fileIndex.get();
|
||||
|
||||
while ( inputScanner.get().hasNextLine() && (fileIndex.get() < (curBeginning + jsonBatchSize)) )
|
||||
{// While (!EOF) and inside the current url-batch, iterate through lines.
|
||||
|
||||
//logger.debug("fileIndex: " + FileUtils.fileIndex.get()); // DEBUG!
|
||||
|
||||
// Take each line, remove potential double quotes.
|
||||
String retrievedLineStr = inputScanner.get().nextLine();
|
||||
//logger.debug("Loaded from inputFile: " + retrievedLineStr); // DEBUG!
|
||||
|
||||
fileIndex.set(fileIndex.get() +1);
|
||||
|
||||
if ( retrievedLineStr.isEmpty() ) {
|
||||
unretrievableInputLines.set(unretrievableInputLines.get() +1);
|
||||
continue;
|
||||
}
|
||||
|
||||
if ( (inputIdUrlTuple = jsonDecoder(retrievedLineStr)) == null ) { // Decode the jsonLine and take the two attributes.
|
||||
logger.warn("A problematic inputLine found: \t" + retrievedLineStr);
|
||||
unretrievableInputLines.set(unretrievableInputLines.get() +1);
|
||||
continue;
|
||||
}
|
||||
|
||||
if ( !idAndUrlMappedInput.put(inputIdUrlTuple.getId(), inputIdUrlTuple.getUrl()) ) { // We have a duplicate url in the input.. log it here as we cannot pass it through the HashMultimap. It's possible that this as well as the original might be/give a docUrl.
|
||||
duplicateIdUrlEntries.set(duplicateIdUrlEntries.get() +1);
|
||||
}
|
||||
}
|
||||
|
||||
return idAndUrlMappedInput;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method decodes a Json String and returns its members.
|
||||
* @param jsonLine String
|
||||
* @return HashMap<String,String>
|
||||
*/
|
||||
private Task jsonDecoder(String jsonLine) {
|
||||
// Get ID and url and put them in the HashMap
|
||||
String idStr = null;
|
||||
String urlStr = null;
|
||||
try {
|
||||
JSONObject jObj = new JSONObject(jsonLine); // Construct a JSONObject from the retrieved jsonLine.
|
||||
idStr = jObj.get("id").toString();
|
||||
urlStr = jObj.get("url").toString();
|
||||
} catch (JSONException je) {
|
||||
logger.warn("JSONException caught when tried to parse and extract values from jsonLine: \t" + jsonLine, je);
|
||||
return null;
|
||||
}
|
||||
|
||||
if ( urlStr.isEmpty() ) {
|
||||
if ( !idStr.isEmpty() ) // If we only have the id, then go and log it.
|
||||
logger.warn("The url was not found for id: \"" + idStr + "\"");
|
||||
return null;
|
||||
}
|
||||
return new Task(idStr, urlStr, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method checks if there is no more input-data and returns true in that case.
|
||||
* Otherwise, it returns false, if there is more input-data to be loaded.
|
||||
* A "RuntimeException" is thrown if no input-urls were retrieved in general.
|
||||
* @param isEmptyOfData
|
||||
* @param isFirstRun
|
||||
* @return finished loading / not finished
|
||||
* @throws RuntimeException
|
||||
*/
|
||||
public boolean isFinishedLoading(boolean isEmptyOfData, boolean isFirstRun) {
|
||||
if ( isEmptyOfData ) {
|
||||
if ( isFirstRun )
|
||||
logger.error("Could not retrieve any urls from the inputFile!");
|
||||
else
|
||||
logger.debug("Done loading " + getCurrentlyLoadedUrls() + " urls from the inputFile.");
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method returns the number of (non-heading, non-empty) lines we have read from the inputFile.
|
||||
* @return loadedUrls
|
||||
*/
|
||||
private int getCurrentlyLoadedUrls() { // In the end, it gives the total number of urls we have processed.
|
||||
return fileIndex.get() - unretrievableInputLines.get();
|
||||
}
|
||||
}
|
|
@ -13,73 +13,72 @@ import java.net.URL;
|
|||
|
||||
public class UriBuilder {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(UriBuilder.class);
|
||||
|
||||
public static String baseUrl = null;
|
||||
|
||||
public UriBuilder(Environment environment)
|
||||
{
|
||||
baseUrl = "http";
|
||||
String sslEnabled = environment.getProperty("server.ssl.enabled");
|
||||
if (sslEnabled == null) { // It's expected to not exist if there is no SSL-configuration.
|
||||
logger.warn("No property \"server.ssl.enabled\" was found in \"application.properties\". Continuing with plain HTTP..");
|
||||
sslEnabled = "false";
|
||||
}
|
||||
baseUrl += sslEnabled.equals("true") ? "s" : "";
|
||||
baseUrl += "://";
|
||||
|
||||
String hostName = getPublicIP();
|
||||
if ( hostName == null )
|
||||
hostName = InetAddress.getLoopbackAddress().getHostName(); // Non-null.
|
||||
|
||||
baseUrl += hostName;
|
||||
String serverPort = environment.getProperty("server.port");
|
||||
if (serverPort == null) { // This is unacceptable!
|
||||
logger.error("No property \"server.port\" was found in \"application.properties\"!");
|
||||
System.exit(-1); // Well, I guess the Spring Boot would not start in this case anyway.
|
||||
}
|
||||
baseUrl += ":" + serverPort;
|
||||
|
||||
String baseInternalPath = environment.getProperty("server.servlet.context-path");
|
||||
if ( baseInternalPath != null ) {
|
||||
if ( !baseInternalPath.startsWith("/") )
|
||||
baseUrl += "/";
|
||||
baseUrl += baseInternalPath;
|
||||
if ( !baseInternalPath.endsWith("/") )
|
||||
baseUrl += "/";
|
||||
} else {
|
||||
logger.warn("No property \"server.servlet.context-path\" was found in \"application.properties\"!"); // Yes it's expected.
|
||||
baseUrl += "/";
|
||||
}
|
||||
|
||||
logger.debug("ServerBaseURL: " + baseUrl);
|
||||
}
|
||||
|
||||
private static String getPublicIP()
|
||||
{
|
||||
String publicIpAddress = "";
|
||||
URL url_name;
|
||||
try {
|
||||
url_name = new URL("https://api.ipify.org/");
|
||||
} catch (MalformedURLException mue) {
|
||||
logger.warn(mue.getMessage());
|
||||
return null;
|
||||
}
|
||||
try (BufferedReader bf = new BufferedReader(new InputStreamReader(url_name.openStream()))) {
|
||||
publicIpAddress = bf.readLine().trim();
|
||||
} catch (Exception e) {
|
||||
logger.warn("Cannot get the publicIP address for this machine!", e);
|
||||
return null;
|
||||
}
|
||||
return publicIpAddress;
|
||||
}
|
||||
|
||||
public static String getBaseUrl() {
|
||||
return baseUrl;
|
||||
}
|
||||
|
||||
public static void setBaseUrl(String baseUrl) {
|
||||
UriBuilder.baseUrl = baseUrl;
|
||||
}
|
||||
// private static final Logger logger = LoggerFactory.getLogger(UriBuilder.class);
|
||||
//
|
||||
// public static String baseUrl = null;
|
||||
//
|
||||
// public UriBuilder(Environment environment) {
|
||||
// baseUrl = "http";
|
||||
// String sslEnabled = environment.getProperty("server.ssl.enabled");
|
||||
// if (sslEnabled == null) { // It's expected to not exist if there is no SSL-configuration.
|
||||
// logger.warn("No property \"server.ssl.enabled\" was found in \"application.properties\". Continuing with plain HTTP..");
|
||||
// sslEnabled = "false";
|
||||
// }
|
||||
// baseUrl += sslEnabled.equals("true") ? "s" : "";
|
||||
// baseUrl += "://";
|
||||
//
|
||||
// String hostName = getPublicIP();
|
||||
// if ( hostName == null )
|
||||
// hostName = InetAddress.getLoopbackAddress().getHostName(); // Non-null.
|
||||
//
|
||||
// baseUrl += hostName;
|
||||
// String serverPort = environment.getProperty("server.port");
|
||||
// if (serverPort == null) { // This is unacceptable!
|
||||
// logger.error("No property \"server.port\" was found in \"application.properties\"!");
|
||||
// System.exit(-1); // Well, I guess the Spring Boot would not start in this case anyway.
|
||||
// }
|
||||
// baseUrl += ":" + serverPort;
|
||||
//
|
||||
// String baseInternalPath = environment.getProperty("server.servlet.context-path");
|
||||
// if ( baseInternalPath != null ) {
|
||||
// if ( !baseInternalPath.startsWith("/") )
|
||||
// baseUrl += "/";
|
||||
// baseUrl += baseInternalPath;
|
||||
// if ( !baseInternalPath.endsWith("/") )
|
||||
// baseUrl += "/";
|
||||
// } else {
|
||||
// logger.warn("No property \"server.servlet.context-path\" was found in \"application.properties\"!"); // Yes it's expected.
|
||||
// baseUrl += "/";
|
||||
// }
|
||||
//
|
||||
// logger.debug("ServerBaseURL: " + baseUrl);
|
||||
// }
|
||||
//
|
||||
// private String getPublicIP()
|
||||
// {
|
||||
// String publicIpAddress = "";
|
||||
// URL url_name;
|
||||
// try {
|
||||
// url_name = new URL("https://api.ipify.org/");
|
||||
// } catch (MalformedURLException mue) {
|
||||
// logger.warn(mue.getMessage());
|
||||
// return null;
|
||||
// }
|
||||
// try (BufferedReader bf = new BufferedReader(new InputStreamReader(url_name.openStream()))) {
|
||||
// publicIpAddress = bf.readLine().trim();
|
||||
// } catch (Exception e) {
|
||||
// logger.warn("Cannot get the publicIP address for this machine!", e);
|
||||
// return null;
|
||||
// }
|
||||
// return publicIpAddress;
|
||||
// }
|
||||
//
|
||||
// public static String getBaseUrl() {
|
||||
// return baseUrl;
|
||||
// }
|
||||
//
|
||||
// public static void setBaseUrl(String baseUrl) {
|
||||
// UriBuilder.baseUrl = baseUrl;
|
||||
// }
|
||||
|
||||
}
|
|
@ -1,30 +1,28 @@
|
|||
# HTTPS CONFIGURATION
|
||||
#server.port = 8443
|
||||
#server.ssl.enabled = true
|
||||
#server.ssl.key-store = src/main/resources/keystore.p12
|
||||
#server.ssl.key-store-type = PKCS12
|
||||
#server.ssl.key-alias = tomcat
|
||||
#server.ssl.key-store-password = urls_controller_project
|
||||
#server.tomcat.remoteip.remote-ip-header = x-your-remote-ip-header
|
||||
#server.tomcat.remoteip.protocol-header = x-your-protocol-header
|
||||
#server.error.include-stacktrace=never
|
||||
|
||||
# HTTP CONFIGURATION
|
||||
server.port = 1880
|
||||
|
||||
# Server api path
|
||||
server.servlet.context-path=/api
|
||||
|
||||
#Service config
|
||||
services.pdfaggregation.controller.db.oldDatabaseName = pdfaggregation_i
|
||||
services.pdfaggregation.controller.db.databaseName = pdfAggregationDatabase
|
||||
services.pdfaggregation.controller.baseTargetLocation = /tmp/
|
||||
services.pdfaggregation.controller.maxAttemptsPerRecord = 3
|
||||
services.pdfaggregation.controller.assignmentLimit = 10000
|
||||
|
||||
services.pdfaggregation.controller.s3.endpoint = xa
|
||||
services.pdfaggregation.controller.s3.accessKey = xa
|
||||
services.pdfaggregation.controller.s3.secretKey = xa
|
||||
services.pdfaggregation.controller.s3.region = xa
|
||||
services.pdfaggregation.controller.s3.bucketName = xa
|
||||
services.pdfaggregation.controller.s3.shouldEmptyBucket = false
|
||||
services.pdfaggregation.controller.s3.shouldShowAllS3Buckets = true
|
||||
|
||||
# Database
|
||||
|
||||
spring.impala.url = jdbc:impala://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/
|
||||
# Note: The "UseNativeQuery" does not work with the PreparedStatements! Also, the url does not work without the ending "/"
|
||||
# The username and the password do not matter, since this app is always run in an pre-authenticated machine.
|
||||
|
||||
spring.impala.oldDatabaseName = pdfaggregation_i
|
||||
spring.impala.databaseName = pdfAggregationDatabase
|
||||
|
||||
spring.impala.driver-class-name = com.cloudera.impala.jdbc41.Driver
|
||||
spring.datasource.url=jdbc:impala://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/
|
||||
spring.datasource.username=
|
||||
spring.datasource.password=
|
||||
spring.datasource.driver-class-name=com.cloudera.impala.jdbc41.Driver
|
||||
|
||||
spring.datasource.hikari.pool-name=ControllerPool
|
||||
spring.datasource.hikari.maximumPoolSize=20
|
||||
|
@ -33,7 +31,6 @@ spring.datasource.hikari.minimumIdle=4
|
|||
spring.datasource.hikari.connectionTimeout=30000
|
||||
spring.datasource.hikari.idleTimeout=600000
|
||||
|
||||
|
||||
# LOGGING LEVELS
|
||||
logging.level.root=INFO
|
||||
logging.level.org.springframework.web=INFO
|
||||
|
@ -41,17 +38,13 @@ logging.level.org.springframework.security=WARN
|
|||
logging.level.eu.openaire.urls_controller=DEBUG
|
||||
spring.output.ansi.enabled=always
|
||||
|
||||
|
||||
## MULTIPART (MultipartProperties)
|
||||
|
||||
# Enable multipart uploads
|
||||
spring.servlet.multipart.enabled=true
|
||||
|
||||
# Threshold after which files are written to disk.
|
||||
spring.servlet.multipart.file-size-threshold=2KB
|
||||
|
||||
# Max file size.
|
||||
spring.servlet.multipart.max-file-size=200MB
|
||||
|
||||
# Max Request Size
|
||||
spring.servlet.multipart.max-request-size=215MB
|
||||
spring.servlet.multipart.max-request-size=215MB
|
Loading…
Reference in New Issue