diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index 0ea6059..5ea6873 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -418,18 +418,8 @@ public class BulkImportServiceImpl implements BulkImportService { } else s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), fileNameID, fileLocationData.getDotFileExtension(), datasourceId, fileHash); - GenericData.Record record = new GenericData.Record(ParquetFileUtils.payloadsSchema); - record.put("id", openAireId); - record.put("original_url", originalUrl); - record.put("actual_url", actualUrl); - record.put("date", timeMillis); - record.put("mimetype", bulkImportSource.getMimeType()); - Long size = docFileData.getSize(); - record.put("size", ((size != null) ? String.valueOf(size) : null)); - record.put("hash", fileHash); // This is already checked and will not be null here. - record.put("location", s3Url); - record.put("provenance", ("bulk:" + provenance)); // Add the "bulk:" prefix in order to be more clear that this record comes from bulkImport, when looking all records in the "payload" VIEW. - return record; + return parquetFileUtils.getPayloadParquetRecord(openAireId, originalUrl, actualUrl, timeMillis, bulkImportSource.getMimeType(), + docFileData.getSize(), fileHash, s3Url, provenance, true); // It may return null. } diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java index cf4199d..1061752 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java @@ -306,23 +306,13 @@ public class ParquetFileUtils { if ( fileLocation == null ) // We want only the records with uploaded full-texts in the "payload" table. continue; - try { - record = new GenericData.Record(payloadsSchema); - record.put("id", payload.getId()); - record.put("original_url", payload.getOriginal_url()); - record.put("actual_url", payload.getActual_url()); - Timestamp timestamp = payload.getTimestamp_acquired(); - record.put("date", (timestamp != null) ? timestamp.getTime() : System.currentTimeMillis()); - record.put("mimetype", payload.getMime_type()); - Long size = payload.getSize(); - record.put("size", ((size != null) ? String.valueOf(size) : null)); - record.put("hash", payload.getHash()); - record.put("location", fileLocation); - record.put("provenance", payload.getProvenance()); + Timestamp timestamp = payload.getTimestamp_acquired(); + record = getPayloadParquetRecord(payload.getId(), payload.getOriginal_url(), payload.getActual_url(), + (timestamp != null) ? timestamp.getTime() : System.currentTimeMillis(), + payload.getMime_type(), payload.getSize(), payload.getHash(), fileLocation, payload.getProvenance(), false); + + if ( record != null ) recordList.add(record); - } catch (Exception e) { - logger.error("Failed to create a payload record!", e); - } } int recordsSize = recordList.size(); @@ -349,6 +339,30 @@ public class ParquetFileUtils { } + public GenericData.Record getPayloadParquetRecord(String id, String original_url, String actual_url, long timeMillis, String mimetype, Long size, + String hash, String fileLocation, String provenance, boolean isForBulkImport) + { + GenericData.Record record; + try { + record = new GenericData.Record(payloadsSchema); + record.put("id", id); + record.put("original_url", original_url); + record.put("actual_url", actual_url); + record.put("date", timeMillis); + record.put("mimetype", mimetype); + record.put("size", ((size != null) ? String.valueOf(size) : null)); + record.put("hash", hash); + record.put("location", fileLocation); + record.put("provenance", (isForBulkImport ? "bulk:" : "") + provenance); + // Add the "bulk:" prefix in order to be more clear that this record comes from bulkImport, when looking all records in the "payload" VIEW. + return record; + } catch (Exception e) { + logger.error("Failed to create a payload record!", e); + return null; + } + } + + public boolean writeToParquet(List recordList, Schema schema, String fullFilePath) { OutputFile outputFile;