-
debezium server 정리 및 트러블슈팅Programming/java 2023. 2. 6. 00:27
개요
CDC(Change Data Capture)
- CDC는 변경 데이터 켑처이라는 의미로 주로 데이터베이스와 같은 데이터 스토어의 데이터 변경을 포착하여 ETL, 감사(audit), 캐싱과 같은 다양한 후속 처리를 하는데 사용한다.
Debezium
- 이름은 DB와 주기율표에 많은 요소 이름에 사용되는 -ium 접미사의 조합
- Debezium은 다양한 데이터베이스를 모니터링하며 모든 이벤트를 대상으로 변경 사항을 분을 거의 즉시 응답하는 분산 오픈 소스 플랫폼이다.
- debezium은 결함과 실패를 허용하도록 설계되었고 수행하는 유일한 방법은 분산 시스템을 사용하는 것이다. Debezium은 모니터링 프로세스 또는 커넥터를 여러 시스템에 분산시킬 수 있고 문제가 발생하면 커넥터를 다시 시작할 수 있다.
- 카카오커머스, 라인커머스, 에이블리, 트리바고 등 주로 커머셜 도메인에서 자주 사용됨
- PostgreSQL, MongoDB, MySQL, Oracle, AWS RDS, Db2, SQL Server, Cassandra를 지원한다.
- 기본적으로 debezium은 카프카의 커넥터 기반 플러그인으로 제공되며 최근 Debezium Server라는 이름으로 독립된 형태로 Embedded Engine을 사용할 수 있으며 카프카 대신 AWS Kinesis, GCP Pub/Sub, Apache Pulsar or Redis 등 다양한 스트림을 대상으로 데이터 베이스 소스를 내보낼 수 있다. 아직 인큐베이팅 상태
- 카프카의 커넥터 형태가 아니라 자바 애플리케이션의 라이브러리로 바로 사용
- Debezium이 중단되었다가 재시작되면 마지막에 저장된 binlog offset부터 다시 읽어 카프카 토픽에 저장한다.
Local 환경 셋팅
- 카프카가 아닌 키네시스를 사용하기 위해서 debezium server를 사용
- 테스트 환경 구축 및 테스트 목적
- Docker를 통해 Mysql과 Redis를 띄우고 Mysql의 변경사항을 Redis에 적재하도록 함
설치
- 아래 저장소에서 버전 확인 후 설치
- 2023-02-05 기준 2.1.2 Final 버전이 가장 최근 버전
- tar.gz 파일을 받고 압축을 해제
io/debezium/debezium-server-dist
- 기본적으로 conf하위에 application.properties 파일에 설정값을 지정하고 ./run.sh 스크립트를 통해서 Debezium 구동
application.properties
debezium.sink.type=redis debezium.sink.redis.address=127.0.0.1:6379 debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=127.0.0.1 debezium.source.database.port=3306 debezium.source.database.user=mysqluser debezium.source.database.password=mysqlpw debezium.source.database.dbname=inventory debezium.source.database.server.name=test1 debezium.source.database.server.id=3461 #아이디값은 임의 값으로 지정 debezium.source.database.include.list=inventory debezium.source.database.history.file.filename=data/history.dat debezium.source.database.history=io.debezium.storage.file.history.FileDatabaseHistory quarkus.log.console.json=false
Docker
# 샘플 데이터가 저장되어있는 Mysql 이미지 run docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9 # Redis 이미지 run docker run --name redis -d -p 6379:6379 redis
Mysql
- mysql 권한부터 만들어야 한다.
- docker exec로 이미지 접속 mysql -uroot -p 으로 mysql cli띄우기→비밀번호는 mysql docker 이미지를 실행할때 변수값으로 전달한 비밀번호 MYSQL_ROOT_PASSWORD
- mysql> CREATE USER 'mysqluser'@'localhost' IDENTIFIED BY 'mysqlpw';
- mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'mysqluser';
- mysql> FLUSH PRIVILEGES;
- REPLICATION CLIENT와 REPLICATION SLAVE는 CDC를 위한 권한
- REPLICATION CLIENT – This privilege is required for CDC tasks only. In other words, full-load-only tasks don’t require this privilege.
- REPLICATION SLAVE – This privilege is required for CDC tasks only. In other words, full-load-only tasks don’t require this privilege.
- SHOW DATABASES와 RELOAD는 스냅샷을 위해 필요한 권한이다.
- RELOAD - Enables the connector the use of the FLUSH statement to clear or reload internal caches, flush tables, or acquire locks. This is used only when performing a snapshot.
- SHOW DATABASES - Enables the connector to see database names by issuing the SHOW DATABASE statement. This is used only when performing a snapshot.
- RELOAD 같은 경우는 스냅샷시 Lock을 걸기 위해 필요한 권한이다. snapshot.locking.mode 옵션을 none으로 설정하면 RELOAD권한이 없어도 스냅샷 기능을 사용할 수 있다. 다만 이 옵션은 정합성을 위해서 스냅샷이 동작 중에 스키마가 변경될 일이 없을 경우에만 권장된다.
Redis
- Redis의 key, value들을 Gui로 확인하기 위해서 맥 기준 앱스토어에서 Medis2라는 클라이언트를 설치한다
- 커텍트를 눌러서 docker로 띄운 redis에 연결한다
- Dbeaver와 같은 클라이언트를 통해서 mysql에 연결 후 테스트를 위해 테이블에 정보를 임의로 수정
"payload":{ "before":{"id":109,"name":"spare tire","description":"32 inch spare tire","weight":22.2}, "after":{"id":109,"name":"spare tire","description":"28 inch spare tire","weight":22.2}
- 페이로드로 데이터가 32 inch spare tire에서 28 inch spare tire로 바뀐 것을 확인할 수 있다.
Kinesis
appication.properties
debezium.sink.type=kinesis debezium.sink.kinesis.region=ap-northeast-1 debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=rds-host.net debezium.source.database.port=3306 debezium.source.database.user=username debezium.source.database.password=password debezium.source.database.dbname=dbname debezium.source.database.server.name=debezium-v2 debezium.source.database.server.id=31 debezium.snapshot.new.tables=parallel debezium.source.snapshot.mode=schema_only # 초기적재를 하지 않고 띄우기 위해서 schema_only debezium.source.snapshot.locking.mode=none debezium.source.include.schema.changes=false debezium.source.table.include.list=item,user debezium.source.database.history.file.filename=data/history.dat debezium.source.database.history=io.debezium.storage.file.history.FileDatabaseHistory quarkus.log.console.json=false
- 스냅샷 모드란 디비지움 커넥터가 처음 시작할 때 데이터의 빈로그 포지션과 테이블, 스키마를 읽고 기본값은 initial 이고 schema_only로 하면 스키마의 스냅샷만 가져온다 스냅샷 모드를 해야만 재시작했을 때 이미 저장해둔 빈로그 위치에서 다시 시작할 수 있다
- 현재 설정값으로는 하나의 스트림 당 하나의 테이블로만 매칭되어있다.
- 필요한 경우 하나의 스트림에 멀티 테이블 로그 ingest하도록 설정할 수 있다.(SMT 설정을 통해 가능)
- 설정값에 server.name값에 맞게 스트리밍명을 설정해야함
- ex)debezium-v2.테이블명
- 파이어호스는 네이밍 규칙은 따로 필요없이 스트림과 이어주면 된다.
TroubleShooting
slave 중복 문제
2022-08-29 12:38:02,327 ERROR [io.deb.pip.ErrorHandler] (mysql:3306) Producer failure: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'mysql-bin-changelog.122640' at 2669334, the last event read from '/rdsdbdata/log/binlog/mysql-bin-changelog.122640' at 2940252, the last byte read from '/rdsdbdata/log/binlog/mysql-bin-changelog.122640' at 2940252. Error code: 1236; SQLSTATE: HY000. at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1189)
- server_id가 중복이 되서 생기는 문제 설정 값에서 서버아이디를 임의의 다른 값으로 수정함으로써 해결
Kinesis 스트림 샤드 Rate 초과 문제
2022-08-31 18:03:41,757 INFO [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Rate exceeded for shard shardId-000000000002 in stream table_name under account 12345678. (Service: Kinesis, Status Code: 400, Request ID: f9276a90-a2ae-ce2dfd553c4b)', error = '{}': software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException: Rate exceeded for shard shardId-000000000002 in stream table_name under account 12345678. (Service: Kinesis, Status Code: 400, Request ID: f9276a90-c875-c0a8-a2ae-ce2dfd553c4b) at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:123) at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:79) at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:59) at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:40) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:77) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:39) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:64) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:34) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:189) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:121) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:147) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:101) at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:55) at software.amazon.awssdk.services.kinesis.DefaultKinesisClient.putRecord(DefaultKinesisClient.java:1562) at io.debezium.server.kinesis.KinesisChangeConsumer.handleBatch(KinesisChangeConsumer.java:113) at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83) at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:827) at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192) at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:146) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
- 키네시스 스트림에 샤드를 늘려서 해결
binary log index Error
- 기존에 사용하던 server.name사용시 io.debezium.DebeziumException: Could not find first log file name in binary log index file Error code: 1236; SQLSTATE: HY000.에러 발생
- 다른 이름으로 새롭게 스트림 생성 후 연결시 정상적으로 새로운 변경 정보만 가져오는 것
max request size 문제
Caused by: io.debezium.DebeziumException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 3815143 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. at io.debezium.server.kafka.KafkaChangeConsumer.lambda$handleBatch$0(KafkaChangeConsumer.java:86) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1064) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:949) at io.debezium.server.kafka.KafkaChangeConsumer.handleBatch(KafkaChangeConsumer.java:83) ... 7 more Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 3815143 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration
- 토픽 메세지가 기본 값을 초과해서 생긴 문제
- debezium.sink.kafka.producer.override.max.request.size 옵션에 적절한 값을 추가
- 그런데도 설정값이 먹히지 않고 계속해서 같은 에러가 발생
- 디비지움 카프카 커넥터 버전과 달리 debezium-server의 경우에는 옵션값에 override라는 이름이 생략해야 된다고 함
- 따라서 debezium.sink.kafka.producer.max.request.size 옵션값으로 시도 후 정상 반영된 것 확인
'Programming > java' 카테고리의 다른 글
가비지 컬렉션, 컬렉터(Garbage Collection)란? (0) 2020.03.01 추상클래스와 인터페이스의 공통점과 차이점 (4) 2019.11.03 오버로딩과 오버라이딩의 차이점 (0) 2019.10.28 [JAVA]랜덤 난수 만들기 (0) 2019.10.03 [JAVA 개념알기]변수 (0) 2019.09.30