728x90

목차

1. 개요

MySQL에는 아래 3가지 방법을 이용하여 중복 레코드를 관리할 수 있다.

  1. INSERT IGNORE ...
  2. REPLACE INTO ...
  3. INSERT INTO ... ON DUPLICATE UPDATE

각 방법의 특징을 요약하면 다음과 같다.

방법특징

INSERT IGNORE ... 최초 입수된 레코드가 남아 있음
최초 입수된 레코드의 AUTO_INCREMENT 값은 변하지 않음
REPLACE INTO ... 최초 입수된 레코드가 삭제되고, 신규 레코드가 INSERT됨
AUTO_INCREMENT의 값이 변경됨
INSERT INTO ... ON DUPLICATE UPDATE INSERT IGNORE의 장점 포함함
중복 키 오류 발생 시, 사용자가 UPDATE될 값을 지정할 수 있음

2. 사전 조건

중복 레코드 관리를 위해선 테이블에 PRIMARY KEY 혹은 UNIQUE INDEX가 필요하다.

본 예제에서는 아래와 같은 person 테이블을 이용하여 설명한다.

CREATE TABLE person
(
  id INT NOT NULL AUTO_INCREMENT,
  name VARCHAR(20),
  address VARCHAR(255),
  PRIMARY KEY (id),
  UNIQUE INDEX (name) -- 중복 검사용 필드
);

person 테이블에서 PRIMARY KEY로 지정된 id 필드는 AUTO_INCREMENT를 위해 만든 필드이며, 본 문서에는 name 필드를 이용하여 중복을 검사한다.

 

만약 기존에 만들어진 테이블에 PRIMARY KEY 혹은 UNIQUE INDEX를 추가하려면 아래의 SQL을 이용하면 된다.

-- PRIMARY 추가하는 방법
ALTER TABLE person ADD PRIMARY KEY (name)

-- UNIQUE INDEX를 추가하는 방법
ALTER TABLE person ADD UNIQUE INDEX (name)

3. 중복 처리 방법

3-1) INSERT IGNORE

INSERT IGNORE는 중복 키 에러가 발생했을 때 신규로 입력되는 레코드를 무시하는 단순한 방법이다.

다음의 예를 보면 중복 키 에러가 발생했을 때 INSERT 구문 자체는 오류가 발생하지 않고, 대신’0 row affected’가 출력된 것을 볼 수 있다.

mysql> INSERT IGNORE INTO person VALUES (NULL, 'James', 'Seoul');
Query OK, 1 row affected (0.00 sec)

mysql> INSERT IGNORE INTO person VALUES (NULL, 'Cynthia', 'Yongin');
Query OK, 1 row affected (0.00 sec)

mysql> INSERT IGNORE INTO person VALUES (NULL, 'James', 'Seongnam');
Query OK, 0 rows affected (0.00 sec)

mysql> INSERT IGNORE INTO person VALUES (NULL, 'Cynthia', 'Seoul');
Query OK, 0 rows affected (0.00 sec)

mysql> INSERT IGNORE INTO person VALUES (NULL, 'James', 'Incheon');
Query OK, 0 rows affected (0.00 sec)

SELECT의 결과는 2건만 존재한다.

mysql> SELECT * FROM person;
+----+---------+---------+
| id | name    | address |
+----+---------+---------+
|  1 | James   | Seoul   |
|  2 | Cynthia | Yongin  |
+----+---------+---------+
2 rows in set (0.00 sec)

James의 주소가 최초 입력한 Seoul이다. 즉, 최초에 입력된 레코드가 남아 있는 걸을 볼 수 있다.

또한 AUTO_INCREMENT 컬럼의 값이 1, 2인 것에 주목하라.

MySQL에서 AUTO_INCREMENT는 식별키 용도로 많이 사용하는데, 중복 발생 여부에 따라 식별 키가 변경되는 경우 여러 가지 불편한 점이 생긴다.

INSERT IGNORE에서는 AUTO_INCREMENT의 값이 변경되지 않는다는 장점이 있다.

3-2) REPLACE INTO

REPLACE INTO는 중복이 발생되었을 때 기존 레코드를 삭제하고 신규 레코드를 INSERT하는 방식이다.

person 테이블을 drop 후 다시 생성한 뒤에 아래의 레코드를 입수해보자.

mysql> REPLACE INTO person VALUES (NULL, 'James', 'Seoul');
Query OK, 1 row affected (0.00 sec)

mysql> REPLACE INTO person VALUES (NULL, 'Cynthia', 'Yongin');
Query OK, 1 row affected (0.00 sec)

mysql> REPLACE INTO person VALUES (NULL, 'James', 'Seongnam');
Query OK, 2 rows affected (0.00 sec)

mysql> REPLACE INTO person VALUES (NULL, 'Cynthia', 'Seoul');
Query OK, 2 rows affected (0.00 sec)

mysql> REPLACE INTO person VALUES (NULL, 'James', 'Incheon');
Query OK, 2 rows affected (0.00 sec)

세번째 레코드를 입수할 때부터는 ‘2 rows affected’가 출력되었다. (INSERT IGNORE에서는 ‘0 rows affected’가 출력되었었다)

 
mysql> SELECT * FROM person;
+----+---------+---------+
| id | name    | address |
+----+---------+---------+
|  4 | Cynthia | Seoul   |
|  5 | James   | Incheon |
+----+---------+---------+
2 rows in set (0.00 sec)

id가 4, 5로 변하였다. 또한 James의 주소가 “Incheon”으로 변경되었다.

이를 ‘2 rows affected’와 함께 종합적으로 판단한다면 “REPLACE INTO”는 다음과 같이 작동하는 것을 알 수 있다.

  • 중복 키 오류 발생 시 기존 레코드를 삭제 -> 첫 번째 레코드가 affected되었음
  • 이후 새로운 레코드를 입력 -> 두 번째 레코드가 affected되었음

그래서 ‘2 rows affected’가 출력되었다.

새로운 레코드가 입력되면서 AUTO_INCREMENT 컬럼의 값이 매번 새롭게 발급되었다.

“REPLACE INTO”는 그다지 좋은 방법이 아닌데 앞서 이야기한 것처럼 AUTO_INCREMENT는 레코드를 식별할 수 있는 id로 사용되는데 중복 발생 시 id가 변경되기 때문이다.

3-3) ON DUPLICATE UPDATE

ON DUPLICATE UPDATE의 장점은 중복 키 오류 발생 시 사용자가 원하는 값을 직접 설정할 수 있다는 점이다.

우선 기본적인 사용 방법을 보자.

마찬가지로 테이블을 drop후 새로 생성했다.

mysql> INSERT INTO person VALUES (NULL, 'James', 'Seoul')
           ON DUPLICATE KEY UPDATE address = VALUES(address);
Query OK, 1 row affected (0.00 sec)

mysql> INSERT INTO person VALUES (NULL, 'Cynthia', 'Yongin')
           ON DUPLICATE KEY UPDATE address = VALUES(address);
Query OK, 1 row affected (0.00 sec)

mysql> INSERT INTO person VALUES (NULL, 'James', 'Seongnam')
           ON DUPLICATE KEY UPDATE address = VALUES(address);
Query OK, 2 rows affected, 1 warning (0.01 sec)

mysql> INSERT INTO person VALUES (NULL, 'Cynthia', 'Seoul')
           ON DUPLICATE KEY UPDATE address = VALUES(address);
Query OK, 2 rows affected, 1 warning (0.00 sec)

mysql> INSERT INTO person VALUES (NULL, 'James', 'Incheon')
           ON DUPLICATE KEY UPDATE address = VALUES(address);
Query OK, 2 rows affected, 1 warning (0.01 sec)

(‘2 rows affected’의 의미는 아래 내용을 읽고 각자 생각해보자)

SELECT 결과를 보자.

mysql> SELECT * FROM person;
+----+---------+---------+
| id | name    | address |
+----+---------+---------+
|  1 | James   | Incheon |
|  2 | Cynthia | Seoul   |
+----+---------+---------+
2 rows in set (0.00 sec)

이번에는 id 값이 최초 입수된 레코드의 값 그대로이다. 하지만 address의 값이 마지막에 입수한 레코드로 변경되어 있다.

 

INSERT INTO ... ON DUPLICATE KEY UPDATE의 장점은 중복 발생 시 필드의 값을 내 맘대로 UPDATE할 수 있다는 점이다.

id 필드만 놓고 보면 INSERT IGNORE와 동일하지만 address의 값이 변경된 것이 INSERT IGNORE와 INSERT INTO ... ON DUPLICATE KEY UPDATE의 차이점이다.

ON DUPLICATE KEY UPDATE를 이용하면 재미있는 것들을 할 수 있다. 중복 레코드가 총 몇 번이나 입수되었는지를 기록해보자.

이를 위해 inserted_cnt 필드를 추가하였다.

CREATE TABLE person
(
  id INT NOT NULL AUTO_INCREMENT,
  name VARCHAR(20),
  address VARCHAR(255),
  inserted_cnt INT, -- 레코드가 몇 번 입수되었는지 확인용 필드
  PRIMARY KEY (id),
  UNIQUE INDEX (name)
);
mysql> INSERT INTO person VALUES (NULL, 'James', 'Seoul', 1)
           ON DUPLICATE KEY UPDATE inserted_cnt = inserted_cnt + 1;
Query OK, 1 row affected (0.00 sec)

mysql> INSERT INTO person VALUES (NULL, 'Cynthia', 'Yongin', 1)
           ON DUPLICATE KEY UPDATE inserted_cnt = inserted_cnt + 1;
Query OK, 1 row affected (0.00 sec)

mysql> INSERT INTO person VALUES (NULL, 'James', 'Seongnam', 1)
           ON DUPLICATE KEY UPDATE inserted_cnt = inserted_cnt + 1;
Query OK, 2 rows affected (0.00 sec)

mysql> INSERT INTO person VALUES (NULL, 'Cynthia', 'Seoul', 1)
           ON DUPLICATE KEY UPDATE inserted_cnt = inserted_cnt + 1;
Query OK, 2 rows affected (0.00 sec)

mysql> INSERT INTO person VALUES (NULL, 'James', 'Incheon', 1)
           ON DUPLICATE KEY UPDATE inserted_cnt = inserted_cnt + 1;
Query OK, 2 rows affected (0.00 sec)

SELECT를 해 보면 inserted_cnt에는 해당 중복 값이 몇 번 INSERT 시도가 되었는지 기록되어 있을 것이다.

mysql> SELECT * FROM person;
+----+---------+---------+--------------+
| id | name    | address | inserted_cnt |
+----+---------+---------+--------------+
|  1 | James   | Seoul   |            3 |
|  2 | Cynthia | Yongin  |            2 |
+----+---------+---------+--------------+
2 rows in set (0.00 sec)

inserted_cnt 필드의 값을 보면 알겠지만, 레코드가 몇 번 입수되었는지 저장되어 있다.

주의해야 할 점은 새로온 레코드의 값으로 UPDATE하고자 할 때는 항상 “VALUES(column)”과 같이 VALUES()로 감싸야 한다는 점이다. 만약 다음과 같이 VALUES()` 없이 필드명만 사용하는 것은 기존 레코드의 컬럼 값을 의미하게 된다.

INSERT INTO person VALUES (NULL, 'James', 'Incheon')
    ON DUPLICATE KEY UPDATE address = address;

따라서 address의 값이 UPDATE되지 않는다.

 

 

Origin

https://jason-heo.github.io/mysql/2014/03/05/manage-dup-key2.html

 

MySQL 중복 레코드 관리 방법 (INSERT 시 중복 키 관리 방법 (INSERT IGNORE, REPLACE INTO, ON DUPLICATE UPDATE))

Test에 사용된 MySQL 버전 목차 1. 개요 MySQL에는 아래 3가지 방법을 이용하여 중복 레코드를 관리할 수 있다. INSERT IGNORE ... REPLACE INTO ... INSERT INTO ... ON DUPLICATE UPDATE 각 방법의 특징을 요약하면 다음

jason-heo.github.io

 

728x90

At Intuit, we faced the challenge of migrating our 27TB Database from Oracle to Postgres which is used by our Monolithic Application consisting of around 600 complex business workflows. While migrating to Postgres, we discovered that Postgres does not support Global Indexes and since we had some partitioned tables, we were facing performance issues if we don’t give the Partition Key as part of the where clause for SQL queries.

In this blog post, we will explore how to use Hibernate Filters and Inspector to dynamically add partition keys to your queries, which can help in mitigating the performance issues.

Using Hibernate Filters

Hibernate filters are a powerful feature that allows you to define conditions that are automatically applied to all queries for a specific entity class. This can be useful in a variety of situations, including adding partition keys to your queries.

To use Hibernate filters, you need to define the filter using the @FilterDef and @Filter annotations in your entity classes. For example, to define a filter that adds a countrycolumn to the WHERE clause of all queries for a User table, you could use the following code:

@Entity
@Table(name = "user_table")
@FilterDef(name = "countryFilter", parameters = @ParamDef(name="country", type = String.class))
@Filter(name = "countryFilter", condition = "country = :country")
@Getter
@Setter
public class User {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String name;

    private String country;

    // Other fields
}

Once you have defined the Hibernate filters, you can enable them at session level in your application. To do this, you can use the enableFilter() method of the Session class and set the parameter values using the setParameter() method. We can also create an Aspect to enable filter for all sessions. For example, to enable the "countryFilter" filter defined above and set the country parameter, you could use the following code:

Session session = entityManager.unwrap(Session.class);
session.enableFilter("countryFilter").setParameter("country", "India");

This code enables the “countryFilter” filter for the current session and sets the country parameter. Any queries executed using this Session will include the country = 'India' condition in the WHERE clause. This will ensure that only records for the specified country are returned by the query.

Problems with Hibernate Filter

Filters are not applied to Update/Delete queries. So to solve this problem we used Hibernate Statement Inspector to manually modify the Update and Delete queries.

Note: Hibernate team is discussing on a native solution for update/delete queries to include partition key, but looks like it is not implemented at the time of writing this blog. You can learn more about that here.

Using Hibernate Statement Inspector

Hibernate Statement Inspector can be used to manually modify the SQL statements generated by Hibernate. This can be useful in situations where you need to modify the SQL statements generated by Hibernate, such as adding partition keys to the WHERE clause of update or delete queries.

For example, to modify the WHERE clause of update and delete queries for a User table to include the country column as a partition key, you could use the following inspector class:

public class SQLStatementInspector implements StatementInspector {

    @Override
    public String inspect(String sql) {
        if (sql.startsWith("update user_table") || sql.startsWith("delete from user_table")) {
            sql = sql + " where country = '" + "India" + "'";
        }
        return sql;
    }
}

You can enable the inspector by adding this property:

<property
    name="hibernate.session_factory.statement_inspector"
    value="com.example.learning.SQLStatementInspector"
/>

Conclusion

Using this combination of Hibernate Filters and Inspector, we were able to solve the critical partitioning issue we faced while migrating to Postgres.

You can see the complete implementation here: https://github.com/Akshit97/hibernate-filter

728x90

동적 데이터 소스와 스키마 이름

API를 개발할 때 Java Spring으로 프로젝트를 진행하고 싶었는데 어려움이 있었다.

해당 문제를 해결하고 세미나에서 발표한 내용에 대해 정리한다.

  • 모든 아키텍처나 코드는 실제 서비스와 무관하며 설명을 위해 만든 부분임을 알린다.

API를 Java Spring으로 할 수 없었던 이유는 서비스의 DB구조 때문이었다.

구조를 보면 DB 서버를 여러 대로 샤딩(Sharding) 하고 있고, 스키마도 분산되어 있다.

데이터를 저장할 때 유저가 속한 지역별로 데이터를 특정 DB서버 특정 스키마에 저장해서 사용한다.

  • board_01에서 01을 파티션이라고 칭한다.

특정 유저의 정보가 어떤 DB 서버 몇 번째 파티션(스키마) 에 저장되어 있는지는 Region DB에 저장되어 있고, DB를 조회할 때마다 매번 Region DB를 질의하면 비효율적이므로 JWT에 발급해서 사용한다.

즉, 정리하면 아래 두 가지 작업을 처리했어야 했다.

  1. DB 서버 매핑 (Sharding)
  2. 스키마명 매핑

기존 프로젝트에서 처리한 방식

PHP 프로젝트

PHP는 요청이 들어올 때마다 실행되는 모델이기 때문에 Thread-Safety에 대한 걱정이 없었다. 그래서 PHP 스크립트가 실행될 때 JWT에 있는 값으로 DB 설정에 반영해주면 되었다.

NodeJS + Sequelize

NodeJS에서는 Entity 모델을 런타임 중 동적으로 생성할 수 있었다. 그래서 Map에 모델 객체들을 미리 생성하고 특정 DB 서버로의 커넥션을 부여했다.

즉, JWT에 있는 값을 기반으로 모델을 꺼내서 사용하면 된다. 모델을 DB 서버 수 x 파티션 수 만큼 메모리에 올리는 문제가 있었지만 처리는 가능했다.

JPA를 사용할 수 없는 이유

Spring 컨테이너 안에서 JPA를 사용하는 경우 위 예시처럼 DB 정보와 함께 모델을 매번 생성하는 게 불가능하다.

아래와 같이 DataSource를 싱글톤 빈으로 등록해서 사용해야 한다.

Schema name도 아래와 같이 애노테이션에 명시하면 앱이 실행될 때 Proxy를 생성한다.

그래서 DB 서버랑 스키마 명을 동적으로 어떻게 매핑할 수 있을 지에 대한 확신이 없었다.

팀에서는 이 문제 때문에 Java로 개발을 못하고 있었고, 자바로 개발된 프로젝트가 1개 있었는데 전부 JdbcTemplate을 사용했다.

  • DataSource를 DB 서버 개수만큼 생성
  • 파티션을 포함한 스키마명은 sql에 명시

정리하면 JPA를 사용하지 못하고 있었고, JdbcTemplate을 사용하더라도 DB 서버가 늘어나면 그것에 맞게 DataSource 개수도 추가해줘야 하는 문제가 있었다.

이 문제를 해결해야 했다.

1. DB Sharding

샤딩의 다양한 기법에 대해서는 아래 포스팅에서 정리했었다.

아래에서 설명할 방법은 앱 서버 수준에서 샤딩 DB를 매핑하는 방법이다.

AbstractRoutingDataSource

정보나 라이브러리를 찾으면서 삽질을 하던 중 Spring Jdbc에서 AbstractRoutingDataSource 라는 클래스를 확인할 수 있었다.

아래는 공식문서이다.

클래스 설명을 보면 Lookup key에 기반하여 Datasources 중에 1개에 대한 getConnection()을 호출해주는 클래스이다. (일반적으로 Thread에 할당된 TransactionContext를 사용한다고 명시되어있다.)

세부 구현을 살펴보면 determineTargetDataSources()라는 메서드로 DataSource를 선택한다.

해당 메서드를 보면 determineCurrentLookupKey()가 데이터 소스 Map의 Key가 되어서 데이터 소스를 선택하는 방식이다.

그래서 'JWT에 있는 DB Host를 LookupKey로 해서 Connection을 하면 되지 않을까?!'라는 생각을 가지게 되었다.

ThreadLocal

내용을 정리해보면 추상 메서드인 determineCurrentLookupKey()를 구현하면 되었다. 하지만 해당 메서드는 파라미터가 없고, getConnection() 내부에서 호출하는 방식이다.

Key를 정적인 필드에서 가져와야 했고, 각 요청(사용자)마다 다른 Key를 가져와야 했다.

그래서 ThreadLocal을 선택하게 되었다.

  • Request Scope Bean vs ThreadLocal을 검토 하다가 ThreadLocal을 선택하게 되었다.
    • Request Scope의 Bean도 내부적으로 ThreadLocal을 사용
    • ThreadLocal은 샤딩 솔루션에 많이 사용되고 있었다.

아래는 AbstractRoutingDataSource의 구현체이다.

class MultiDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return DBContextHolder.getIp();
    }
}

아래와 같이 ThreadLocal에 정적으로 접근할 수 있는 Holder 클래스를 만든다.

public class DBContextHolder {
    private static final ThreadLocal<DbInfo> threadLocal = new ThreadLocal();

    public static void setDbInfo(String ip, String partition) {
        DbInfo dbInfo = new DbInfo(ip, partition);
        threadLocal.set(dbInfo);
    }

    public static DbInfo getDbInfo() {
        DbInfo dbInfo = threadLocal.get();
        if(dbInfo == null) {
            throw new IllegalStateException("DbInfo가 존재하지 않습니다.");
        }
        return dbInfo;
    }

    public static String getIp() {
        DbInfo dbInfo = getDbInfo();
        return dbInfo.ip();
    }

    public static String getPartition() {
        DbInfo dbInfo = getDbInfo();
        return dbInfo.partition();
    }

    public static void clear() {
        threadLocal.remove();
    }
}

DbInfo 클래스에는 host와 partition을 담는다.

public record DbInfo(String ip, String partition) {
}

지금까지 샤딩된 DataSource를 선택하는 코드를 작성했다.

MultiDataSourceManager

런타임 중에 AbstractRoutingDataSource의 dataSource를 추가할 수 있어야 한다.

AbstractRoutingDataSource를 관리하는 클래스를 하나 만들자.

@Slf4j
@Configuration
public class MultiDataSourceManager {
    // key = hostIp, value = DataSource
    // 동시성을 보장해야 하므로 ConcurrentHashMap을 사용한다.
    private final Map<Object, Object> dataSourceMap = new ConcurrentHashMap<>();

    private final AbstractRoutingDataSource multiDataSource;
    private final DataSourceCreator dataSourceCreator;

    public MultiDataSourceManager(DataSourceCreator dataSourceCreator) {
        MultiDataSource multiDataSource = new MultiDataSource();
        // AbstractRoutingDataSource의 targetDataSources를 지정
        multiDataSource.setTargetDataSources(dataSourceMap);
        // Key 대상이 없을 경우 호출되는 DataSource 지정 (해당 프로젝트에서는 Key가 없으면 예외가 터지도록 설계)
        multiDataSource.setDefaultTargetDataSource(dataSourceCreator.defaultDataSource());
        this.multiDataSource = multiDataSource;
        this.dataSourceCreator = dataSourceCreator;
    }

    @Bean
    public AbstractRoutingDataSource multiDataSource() {
        return multiDataSource;
    }

    public void addDataSourceIfAbsent(String ip) {
        if (!this.dataSourceMap.containsKey(ip)) {
            DataSource newDataSource = dataSourceCreator.generateDataSource(ip);
            try (Connection connection = newDataSource.getConnection()) {
                dataSourceMap.put(ip, newDataSource);
                // 실제로 사용하는 resolvedTargetDataSource에 반영하는 코드
                multiDataSource.afterPropertiesSet();
            } catch (SQLException e) {
                throw new IllegalArgumentException("Connection failed.");
            }
        }
    }
}

이제 최초 JPA EntityLoading 시 필요한 defaultDataSource를 만들어야 한다.

추가로 hostIp를 입력받아서 DataSource를 만드는 책임도 아래 클래스에서 수행한다.

@Configuration
@RequiredArgsConstructor
public class DataSourceCreator {
    private final DBProperties dbProperties;

    public DataSource generateDataSource(String ip) {
        HikariConfig hikariConfig = initConfig(ip);
        return new HikariDataSource(hikariConfig);
    }

    public DataSource defaultDataSource() {
        String defaultHostIp = dbProperties.getDefaultHostIp();
        String defaultHostPartition = dbProperties.getDefaultPartition();

        HikariConfig hikariConfig = initConfig(defaultHostIp);
        HikariDataSource datasource = new HikariDataSource(hikariConfig);
        // JPA 엔터티 최초 로딩 시 파티션 보관 필요
        DBContextHolder.setDbInfo(defaultHostIp, defaultHostPartition);
        return datasource;
    }

    private HikariConfig initConfig(String hostIp) {
        HikariConfig hikariConfig = new HikariConfig();
        hikariConfig.setJdbcUrl(getConnectionString(hostIp));
        hikariConfig.setUsername(dbProperties.getUsername());
        hikariConfig.setPassword(dbProperties.getPassword());
        hikariConfig.setDriverClassName(dbProperties.getDriver());
        return hikariConfig;
    }

    public String getConnectionString(String hostname) {
        StringBuilder sb = new StringBuilder()
            .append("jdbc:mysql://")
            .append(hostname)
            .append(":").append(dbProperties.getPort())
            .append("/").append(dbProperties.getDefaultSchema());
        return sb.toString();
    }
}

AbstractRoutingDataSource와 관련된 코드는 모두 작성했다.

Util 클래스 제공

이제 라이브러리의 사용처에서 ThreadLocal에 DbInfo를 넣어주고, DataSource가 없는 경우 생성해줘야 한다.

그런데 사용처에서는 ThreadLocal에 대해 직접적으로 다루지 않게 하고 싶었고, 편의성을 위해 라이브러리의 개념에도 접근할 필요가 없게 하고 싶었다.

  • ThreadLocal은 다소 위험한 개념
    • ThreadPool을 사용하는 경우 이전 Thread의 정보를 가져와서 잘못 쿼리가 나갈 수 있음
  • 라이브러리의 개념에 접근해야 한다면 매번 라이브러리를 설명해야 함

그래서 사용자 편의를 위해 유틸성 클래스를 제공하기로 했다! 작성한 코드에서는 Filter, AOP 두 방식을 지원한다.

Filter로 처리

아래는 Filter로 처리를 구현한 코드이다.

@RequiredArgsConstructor
public class ShardingFilter extends OncePerRequestFilter {
    private final MultiDataSourceManager multiDataSourceManager;

    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
        DbInfo dbInfo = JwtParser.getDbInfoByRequest(request);
        DBContextHolder.setDbInfo(dbInfo);

        // DataSource가 존재하지 않을 경우에 새로 생성해준다.
        multiDataSourceManager.addDataSourceIfAbsent(dbInfo.ip());

        try {
            filterChain.doFilter(request, response);
        } finally {
            // ThreadPool을 사용하기 때문에 다른 요청에서 재사용할 수 없도록 반드시 clear()를 호출해야 한다.
            DBContextHolder.clear();
        }
    }
}

해당 필터를 사용해서 요청이 들어왔을 때 ThreadLocal에 DBInfo를 세팅하고 DataSource를 생성하는 로직을 비즈니스 로직에서 분리할 수 있다.

AOP로 처리

Batch 서버와 같이 Web 요청이 없는 경우 Filter로 처리가 불가능하다. 그래서 AOP 방식도 지원한다.

특정 메서드에 아래 애노테이션만 붙이면 샤딩을 처리할 수 있도록 처리하자.

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Sharding {
}

아래와 같이 AOP 모듈을 구현한다.

@Aspect
@Component
@ConditionalOnBean(LoadDbInfoProcess.class)
@RequiredArgsConstructor
public class DataSourceAdditionAspect {
    private final LoadDbInfoProcess loadDbInfoProcess;
    private final MultiDataSourceManager multiDataSourceManager;

    @Around("@annotation(com.violetbeach.sharding.module.aop.Sharding)")
    public void execute(ProceedingJoinPoint joinPoint) throws Throwable {
        DbInfo dbInfo = loadDbInfoProcess.loadDbInfo();
        DBContextHolder.setDbInfo(dbInfo);
        try {
            // DataSource가 존재하지 않을 경우에 새로 생성해준다.
            multiDataSourceManager.addDataSourceIfAbsent(dbInfo.ip());
            joinPoint.proceed();
        } finally {
            DBContextHolder.clear();
        }
    }

}

아래는 DBInfo를 가져오는 인터페이스이다.

public interface LoadDbInfoProcess {
    DbInfo loadDbInfo();
}

구현체는 사용자가 원하는 방식으로 구현할 수 있다.

ThreadLocal을 사용해도 되고, Security Context를 사용하거나 Spring Batch를 사용한다면 JobScope에서 ip와 partition을 꺼내는 등 원하는 방식으로 구현하면 된다.

아래는 테스트 결과이다.

이렇게 해서 샤딩 문제가 해결되었다!

2. Dynamic Schema Name

1가지 문제가 남아있다. 스키마명을 jwt에 있는 partition을 사용해서 바꿔야 한다.

org.hibernate.resource.jdbc.spi.StatementInspector를 구현하면 된다. StatementInspector를 사용하면 기존 sql의 일부를 변경하거나 완전히 대체할 수 있다.

public class PartitionInspector implements StatementInspector {
    @Override
    public String inspect(String sql) {
        String partition = DBContextHolder.getPartition();
        return sql.replaceAll("@@partition_number", partition);
    }
}

이제 HibernatePropertiesCustomizer를 빈으로 등록하면 된다.

@Configuration
@RequiredArgsConstructor
public class HibernateConfig {
    @Bean
    public HibernatePropertiesCustomizer hibernatePropertiesCustomizer() {
        return (properties) -> {
            properties.put(AvailableSettings.STATEMENT_INSPECTOR, new PartitionInspector());
        };
    }
}

Entity는 아래와 같이 설정한다.

@Entity
@Table(schema = "member_@@partition_number", name = "member")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Member {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String username;

    public Member(String username) {
        this.username = username;
    }
}

MySQL에서는 @Table 애노테이션의 schema 옵션이 동작하지 않는다. 대신 catalog 옵션을 사용해야 한다.

아래는 테스트 결과이다.

쿼리도 문제 없이 나가고 DB 반영도 잘된다.

이제 Dyanmic Schema name 문제까지도 모두 해결되었다.

결과

다음은 통합 테스트의 결과이다. 전부 PASS했다.

이후 수행한 nGrinder로 운영 환경에서의 테스트도 잘 통과했고, 지금은 1년 반 넘게 문제 없이 사용하고 있다.

번외 1 - afterPropertiesSet

MultiDataSourceManager에서 데이터소스를 추가할 때마다 AbstractRoutingDataSource의 afterPropertiesSet() 메서드를 호출하고 있다.

해당 메서드는 아래와 같이 설정한 targetDataSources를 실제로 동작할 때 사용하는 resolvedDataSources에 반영하는 메서드이다.

  • resolvedDataSources에 DataSource를 직접 추가할 수 없다. (가시성)
  • 그래서 targetDataSources에 DataSource를 추가한 후 반드시 afterPropertiesSet()을 호출해야 한다.

afterPropertiesSet()은 InitializingBean의 빈이 등록되었을 때 실행되는 메서드이다. 런타임 중 DataSource를 추가로 생성하는 상황에서 적절한 의미를 외부로 뿜을 수 없었다.

나는 이부분을 Spring-jdbc에 PR을 올려서 이 부분의 가독성 문제를 언급했고 해결방안으로 메서드 추출(

refresh

initialize)을 제시했다.

해당 PR은 main 브랜치로 머지되었고 Spring Framework 6.1.0부터 반영된다고 한다.

번외 2 - 비동기

중간에 비동기 쓰레드를 사용할 경우 ThreadLocal의 데이터를 비동기 쓰레드로 옮겨줘야 한다.

해당 동작을위해 TaskDecorator의 구현체를 제공한다.

public class DBContextHolderDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        DbInfo dbInfo = DBContextHolder.getDbInfo();

        return () -> {
            DBContextHolder.setDbInfo(dbInfo);
            try {
                runnable.run();
            } finally {
                DBContextHolder.clear();
            }
        };
    }
}

ThreadPoolTaskExecutor에서는 아래와 같이 Decorator를 등록할 수 있다.

ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setTaskDecorator(new DBContextHolderDecorator());

정리

코드는 실제 프로젝트에 적용된 코드는 아니고 설명을 위해 간소화된 코드입니다.

코드는 아래에서 확인할 수 있습니다.

728x90

데이터베이스 분할에 대한 고민

전체 데이터베이스에 모든 데이터를 한 테이블 혹은 데이터베이스에서 관리하기가 어려워진다. 데이터베이스 볼륨이 커지면 커질수록 데이터베이스 읽기/쓰기 성능은 감소할 것이고, 데이터베이스가 병목 지점이 될 것이다. 따라서 이를 적절히 분할할 필요가 있다. 데이터베이스를 분할하는 방법은 크게 샤딩(sharding) 파티셔닝(partitioning)이 있다. 이 두 가지 기술은 모두 거대한 데이터셋을 서브셋으로 분리하여 관리하는 방법이다. 이번 포스팅에서는 이 둘의 개념과 차이점에 대해 알아본다.

파티셔닝이란?

MySQL 기준으로 기술되었다.

파티셔닝은 매우 큰 테이블을 여러개의 테이블로 분할하는 작업이다. 큰 데이터를 여러 테이블로 나눠 저장하기 때문에 쿼리 성능이 개선될 수 있다. 이때, 데이터는 물리적으로 여러 테이블로 분산하여 저장되지만, 사용자는 마치 하나의 테이블에 접근하는 것과 같이 사용할 수 있다는 점이 특징이다. 파티셔닝은 MySQL 단에서 자체적으로 지원하는 기능이다.

MySQL 에서 파티셔닝을 지원하는 스토리지 엔진은 InnoDB와 NDB이며, MyISAM은 파티셔닝을 지원하지 않는다.

MySQL 파티셔닝 종류

출처: https://docs.oracle.com/cd/B12037_01/server.101/b10743/partconc.htm

파티셔닝 종류는 Oracle과 MySQL 공식문서에서 소개한 4가지 방식을 간단하게 설명하겠다.

List Partitioning

데이터 값이 특정 목록에 포함된 경우 데이터를 분리한다. 위 그림 처럼 특정 지역별로 데이터를 분할할 때 사용할 수 있겠다.

Range Partitioning

데이터를 특정 범위 기준으로 분할할 때 사용한다. 위 처럼 1~2월, 3~4월, 5~6월 … 으로 데이터를 분리할 때 사용할 수 있다.

Hash Partitioning

해시 함수를 사용하여 데이터를 분할할 때 사용한다. 특정 컬럼의 값을 해싱하여 저장할 파티션을 선택한다. MySQL 공식 문서에 따르면, 여러 컬럼으로 해싱하는 것은 크게 권장하지 않는다고 한다 (참고).

Composite Partitioning

위 파티셔닝 종류 중 두개 이상을 사용하는 방식이다.

샤딩이란?

샤딩은 동일한 스키마를 가지고 있는 여러대의 데이터베이스 서버들에 데이터를 작은 단위로 나누어 분산 저장하는 기법이다. 이때, 작은 단위를 샤드(shard)라고 부른다.

어떻게 보면 샤딩은 수평 파티셔닝의 일종이다. 차이점은 파티셔닝은 모든 데이터를 동일한 컴퓨터에 저장하지만, 샤딩은 데이터를 서로 다른 컴퓨터에 분산한다는 점이다. 물리적으로 서로 다른 컴퓨터에 데이터를 저장하므로, 쿼리 성능 향상과 더불어 부하가 분산되는 효과까지 얻을 수 있다. 즉, 샤딩은 데이터베이스 차원의 수평 확장(scale-out)인 셈이다.

샤딩은 위와 같이 물리적으로 분산된 환경에서 사용되는 기법으로 데이터베이스 차원이 아닌 애플리케이션 레벨에서 구현하는 것이 일반적이다. 다만 샤딩을 플랫폼 차원에서 제공하는 시도가 많다고 한다. Naver d2 포스팅에 따르면, Hibernate Shards와 같이 애플리케이션 서버에서 동작하는 형태, CUBRID SHARD, Spock Proxy, Gizzard 와 같이 미들 티어(middle tier)에서 동작하는 형태, nStore나 MongoDB와 같이 데이터베이스 자체에서 샤딩을 제공하는 형태로 나뉜다고 한다.

(고등학교 때 MERN stack 공부하면서, mongodb 샤딩이 대체 뭐지?… 하고 그냥 넘어갔는데 드디어 공부했다!)

주의점

데이터를 물리적으로 독립된 데이터베이스에 각각 분할하여 저장하므로, 여러 샤드에 걸친 데이터를 조인하는 것이 어렵다. 또한, 한 데이터베이스에 집중적으로 데이터가 몰리면 Hotspot이 되어 성능이 느려진다. 따라서 데이터를 여러 샤드로 고르게 분배하는 것이 중요하다. 또 Celebrity Problem 등 다양한 문제가 존재한다. 자세한 내용은 이전에 작성한 [가상면접 사례로 배우는 대규모 시스템 설계 기초] Chap01. 사용자 수에 따른 규모 확장성 포스팅을 참고하자.

샤딩 종류

샤딩의 종류는 다양하지만, 크게 Hash Sharding, Range Sharding 두 가지를 알아보겠다.

Hash Sharding

출처: https://medium.com/nerd-for-tech/all-about-database-sharding-scaling-up-the-database-3b6172491cd

Hash Sharding 중 나머지 연산을 사용한 Modular Sharding 을 알아본다. Modular Sharding은 PK값의 모듈러 연산 결과를 통해 샤드를 결정하는 방식이다. 총 데이터베이스 수가 정해져있을 때 유용하다. 데이터베이스 개수가 줄어들거나 늘어나면 해시 함수도 변경해야하고, 따라서 데이터의 재 정렬이 필요하다.

Range Sharding

출처: https://medium.com/nerd-for-tech/all-about-database-sharding-scaling-up-the-database-3b6172491cd

PK 값을 범위로 지정하여 샤드를 지정하는 방식이다. 예를 들어 PK가 1~1,000 까지는 1번 샤드에, 1,001~2,000 까지는 2번 샤드에, 2,001~ 부터는 3번 샤드에 저장할 수 있다. Hash Sharding 대비 데이터베이스 증설 작업에 큰 리소스가 소요되지 않는다. 따라서 급격히 증가할 수 있는 성격의 데이터는 Range Sharding 을 사용함이 좋아보인다. 이런 특징으로 Range Sharding은 Dynamic Sharding 으로도 불린다.

다만, 이렇게 기껏 분산을 시켜놨는데 특정한 데이터베이스에만 부하가 몰릴 수 있다. 예를 들어 페이스북 게시물을 Range Sharding 했다고 가정해보자. 대부분의 트래픽은 최근에 작성한 게시물에서 발생할 것이다. 위 그림에서는 2, 3번 샤드에만 부하가 몰리는 것이다. 부하 분산을 위해 데이터가 몰리는 DB는 다시 재 샤딩(re-sharding)하고, 트래픽이 저조한 데이터베이스는 다시 통합하는 작업이 필요할 것이다.

참고

+ Recent posts