document-store-lib-postgresql/src/main/java/org/gcube/documentstore/persistence/StatementMap.java

77 lines
2.5 KiB
Java

package org.gcube.documentstore.persistence;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.gcube.accounting.utility.postgresql.RecordToDBConnection;
import org.gcube.accounting.utility.postgresql.RecordToDBMapping;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
public class StatementMap {
private static final InheritableThreadLocal<Map<String, Statement>> statementsThreadLocal = new InheritableThreadLocal<Map<String, Statement>>() {
@Override
protected Map<String, Statement> initialValue() {
return new HashMap<String, Statement>();
}
};
private PersistenceBackendConfiguration configuration;
public StatementMap(PersistenceBackendConfiguration configuration) {
this.configuration = configuration;
Map<String, Class<? extends AggregatedRecord<?,?>>> aggregatedRecords = RecordUtility.getAggregatedRecordClassesFound();
for(String typeName : aggregatedRecords.keySet()) {
try {
Class<? extends AggregatedRecord<?,?>> clz = aggregatedRecords.get(typeName);
RecordToDBMapping.getRecordToDB(clz);
} catch (Exception e) {
new RuntimeException(e);
}
}
}
protected Connection getConnection(Class<? extends AggregatedRecord<?, ?>> clz) throws Exception {
RecordToDBConnection recordDBInfo = RecordToDBMapping.getRecordDBInfo(clz);
if(recordDBInfo == null) {
RecordToDBMapping.addRecordToDB(clz, configuration);
recordDBInfo = RecordToDBMapping.getRecordDBInfo(clz);
}
return recordDBInfo.getConnection();
}
public synchronized Statement getStatement(Record record) throws Exception {
Map<String, Statement> map = statementsThreadLocal.get();
String type = record.getRecordType();
Statement statement = map.get(type);
if(statement == null) {
Class<? extends AggregatedRecord<?, ?>> clz = RecordUtility.getAggregatedRecordClass(type);
Connection connection = getConnection(clz);
statement = connection.createStatement();
map.put(type, statement);
}
return statement;
}
public synchronized void close() throws Exception {
Map<String, Statement> map = statementsThreadLocal.get();
Collection<Statement> statements = map.values();
statementsThreadLocal.remove();
for(Statement statement : statements) {
statement.close();
Connection connection = statement.getConnection();
connection.commit();
connection.close();
}
}
}