ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • debezium server 정리 및 트러블슈팅
    Programming/java 2023. 2. 6. 00:27

    개요

    CDC(Change Data Capture)

    • CDC는 변경 데이터 켑처이라는 의미로 주로 데이터베이스와 같은 데이터 스토어의 데이터 변경을 포착하여 ETL, 감사(audit), 캐싱과 같은 다양한 후속 처리를 하는데 사용한다.

    Debezium

    • 이름은 DB와 주기율표에 많은 요소 이름에 사용되는 -ium 접미사의 조합
    • Debezium은 다양한 데이터베이스를 모니터링하며 모든 이벤트를 대상으로 변경 사항을 분을 거의 즉시 응답하는 분산 오픈 소스 플랫폼이다.
      • debezium은 결함과 실패를 허용하도록 설계되었고 수행하는 유일한 방법은 분산 시스템을 사용하는 것이다. Debezium은 모니터링 프로세스 또는 커넥터를 여러 시스템에 분산시킬 수 있고 문제가 발생하면 커넥터를 다시 시작할 수 있다.
    • 카카오커머스, 라인커머스, 에이블리, 트리바고 등 주로 커머셜 도메인에서 자주 사용됨
    • PostgreSQL, MongoDB, MySQL, Oracle, AWS RDS, Db2, SQL Server, Cassandra를 지원한다.
    • 기본적으로 debezium은 카프카의 커넥터 기반 플러그인으로 제공되며 최근 Debezium Server라는 이름으로 독립된 형태로 Embedded Engine을 사용할 수 있으며 카프카 대신 AWS Kinesis, GCP Pub/Sub, Apache Pulsar or Redis 등 다양한 스트림을 대상으로 데이터 베이스 소스를 내보낼 수 있다. 아직 인큐베이팅 상태
      • 카프카의 커넥터 형태가 아니라 자바 애플리케이션의 라이브러리로 바로 사용
    • Debezium이 중단되었다가 재시작되면 마지막에 저장된 binlog offset부터 다시 읽어 카프카 토픽에 저장한다.

    Local 환경 셋팅

    • 카프카가 아닌 키네시스를 사용하기 위해서 debezium server를 사용
    • 테스트 환경 구축 및 테스트 목적
    • Docker를 통해 Mysql과 Redis를 띄우고 Mysql의 변경사항을 Redis에 적재하도록 함

    설치

    • 아래 저장소에서 버전 확인 후 설치
      • 2023-02-05 기준 2.1.2 Final 버전이 가장 최근 버전
    • tar.gz 파일을 받고 압축을 해제

    io/debezium/debezium-server-dist

     

    Central Repository: io/debezium/debezium-server-dist

     

    repo1.maven.org

    • 기본적으로 conf하위에 application.properties 파일에 설정값을 지정하고 ./run.sh 스크립트를 통해서 Debezium 구동

    application.properties

    debezium.sink.type=redis
    debezium.sink.redis.address=127.0.0.1:6379
    debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
    debezium.source.offset.storage.file.filename=data/offsets.dat
    debezium.source.offset.flush.interval.ms=0
    debezium.source.database.hostname=127.0.0.1
    debezium.source.database.port=3306
    debezium.source.database.user=mysqluser
    debezium.source.database.password=mysqlpw
    debezium.source.database.dbname=inventory
    debezium.source.database.server.name=test1
    debezium.source.database.server.id=3461 #아이디값은 임의 값으로 지정
    debezium.source.database.include.list=inventory
    debezium.source.database.history.file.filename=data/history.dat
    debezium.source.database.history=io.debezium.storage.file.history.FileDatabaseHistory
    quarkus.log.console.json=false
    

    Docker

    # 샘플 데이터가 저장되어있는 Mysql 이미지 run
    docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9
    
    # Redis 이미지 run
    docker run --name redis -d -p 6379:6379 redis
    

    Mysql

    • mysql 권한부터 만들어야 한다.
    1. docker exec로 이미지 접속 mysql -uroot -p 으로 mysql cli띄우기→비밀번호는 mysql docker 이미지를 실행할때 변수값으로 전달한 비밀번호 MYSQL_ROOT_PASSWORD
    2. mysql> CREATE USER 'mysqluser'@'localhost' IDENTIFIED BY 'mysqlpw';
    3. mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'mysqluser';
    4. mysql> FLUSH PRIVILEGES;
    • REPLICATION CLIENT와 REPLICATION SLAVE는 CDC를 위한 권한
      • REPLICATION CLIENT – This privilege is required for CDC tasks only. In other words, full-load-only tasks don’t require this privilege.
      • REPLICATION SLAVE – This privilege is required for CDC tasks only. In other words, full-load-only tasks don’t require this privilege.
    • SHOW DATABASES와 RELOAD는 스냅샷을 위해 필요한 권한이다.
      • RELOAD - Enables the connector the use of the FLUSH statement to clear or reload internal caches, flush tables, or acquire locks. This is used only when performing a snapshot.
      • SHOW DATABASES - Enables the connector to see database names by issuing the SHOW DATABASE statement. This is used only when performing a snapshot.
    • RELOAD 같은 경우는 스냅샷시 Lock을 걸기 위해 필요한 권한이다. snapshot.locking.mode 옵션을 none으로 설정하면 RELOAD권한이 없어도 스냅샷 기능을 사용할 수 있다. 다만 이 옵션은 정합성을 위해서 스냅샷이 동작 중에 스키마가 변경될 일이 없을 경우에만 권장된다.

     

    Redis

    • Redis의 key, value들을 Gui로 확인하기 위해서 맥 기준 앱스토어에서 Medis2라는 클라이언트를 설치한다

    • 커텍트를 눌러서 docker로 띄운 redis에 연결한다

    • Dbeaver와 같은 클라이언트를 통해서 mysql에 연결 후 테스트를 위해 테이블에 정보를 임의로 수정

    "payload":{
    "before":{"id":109,"name":"spare tire","description":"32 inch spare tire","weight":22.2},
    "after":{"id":109,"name":"spare tire","description":"28 inch spare tire","weight":22.2}
    
    • 페이로드로 데이터가 32 inch spare tire에서 28 inch spare tire로 바뀐 것을 확인할 수 있다.

    Kinesis

    appication.properties

    debezium.sink.type=kinesis
    debezium.sink.kinesis.region=ap-northeast-1
    debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
    debezium.source.offset.storage.file.filename=data/offsets.dat
    debezium.source.offset.flush.interval.ms=0
    debezium.source.database.hostname=rds-host.net
    debezium.source.database.port=3306
    debezium.source.database.user=username
    debezium.source.database.password=password
    debezium.source.database.dbname=dbname
    debezium.source.database.server.name=debezium-v2
    debezium.source.database.server.id=31
    debezium.snapshot.new.tables=parallel
    debezium.source.snapshot.mode=schema_only # 초기적재를 하지 않고 띄우기 위해서 schema_only
    debezium.source.snapshot.locking.mode=none
    debezium.source.include.schema.changes=false
    debezium.source.table.include.list=item,user
    debezium.source.database.history.file.filename=data/history.dat
    debezium.source.database.history=io.debezium.storage.file.history.FileDatabaseHistory
    quarkus.log.console.json=false
    • 스냅샷 모드란 디비지움 커넥터가 처음 시작할 때 데이터의 빈로그 포지션과 테이블, 스키마를 읽고 기본값은 initial 이고 schema_only로 하면 스키마의 스냅샷만 가져온다 스냅샷 모드를 해야만 재시작했을 때 이미 저장해둔 빈로그 위치에서 다시 시작할 수 있다
    • 현재 설정값으로는 하나의 스트림 당 하나의 테이블로만 매칭되어있다.
    • 필요한 경우 하나의 스트림에 멀티 테이블 로그 ingest하도록 설정할 수 있다.(SMT 설정을 통해 가능)
    • 설정값에 server.name값에 맞게 스트리밍명을 설정해야함
      • ex)debezium-v2.테이블명
    • 파이어호스는 네이밍 규칙은 따로 필요없이 스트림과 이어주면 된다.

    TroubleShooting

    slave 중복 문제

    2022-08-29 12:38:02,327 ERROR [io.deb.pip.ErrorHandler] (mysql:3306) Producer failure: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'mysql-bin-changelog.122640' at 2669334, the last event read from '/rdsdbdata/log/binlog/mysql-bin-changelog.122640' at 2940252, the last byte read from '/rdsdbdata/log/binlog/mysql-bin-changelog.122640' at 2940252. Error code: 1236; SQLSTATE: HY000.
    	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1189)
    
    • server_id가 중복이 되서 생기는 문제 설정 값에서 서버아이디를 임의의 다른 값으로 수정함으로써 해결

    Kinesis 스트림 샤드 Rate 초과 문제

    2022-08-31 18:03:41,757 INFO  [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Rate exceeded for shard shardId-000000000002 in stream table_name under account 12345678. (Service: Kinesis, Status Code: 400, Request ID: f9276a90-a2ae-ce2dfd553c4b)', error = '{}': software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException: Rate exceeded for shard shardId-000000000002 in stream table_name under account 12345678. (Service: Kinesis, Status Code: 400, Request ID: f9276a90-c875-c0a8-a2ae-ce2dfd553c4b)
    	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:123)
    	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:79)
    	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:59)
    	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:40)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
    	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:77)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:39)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:64)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:34)
    	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
    	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
    	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
    	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:189)
    	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:121)
    	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:147)
    	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:101)
    	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
    	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:55)
    	at software.amazon.awssdk.services.kinesis.DefaultKinesisClient.putRecord(DefaultKinesisClient.java:1562)
    	at io.debezium.server.kinesis.KinesisChangeConsumer.handleBatch(KinesisChangeConsumer.java:113)
    	at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
    	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:827)
    	at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
    	at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:146)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    

    • 키네시스 스트림에 샤드를 늘려서 해결

    binary log index Error

    • 기존에 사용하던 server.name사용시 io.debezium.DebeziumException: Could not find first log file name in binary log index file Error code: 1236; SQLSTATE: HY000.에러 발생
    • 다른 이름으로 새롭게 스트림 생성 후 연결시 정상적으로 새로운 변경 정보만 가져오는 것

    max request size 문제

    Caused by: io.debezium.DebeziumException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 3815143 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
    	at io.debezium.server.kafka.KafkaChangeConsumer.lambda$handleBatch$0(KafkaChangeConsumer.java:86)
    	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1064)
    	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:949)
    	at io.debezium.server.kafka.KafkaChangeConsumer.handleBatch(KafkaChangeConsumer.java:83)
    	... 7 more
    Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 3815143 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration
    
    • 토픽 메세지가 기본 값을 초과해서 생긴 문제
    • debezium.sink.kafka.producer.override.max.request.size 옵션에 적절한 값을 추가
    • 그런데도 설정값이 먹히지 않고 계속해서 같은 에러가 발생
    • 디비지움 카프카 커넥터 버전과 달리 debezium-server의 경우에는 옵션값에 override라는 이름이 생략해야 된다고 함
    • 따라서 debezium.sink.kafka.producer.max.request.size 옵션값으로 시도 후 정상 반영된 것 확인

    댓글

Copyright 2023. 은유 All rights reserved.