티스토리 뷰

AWS

DynamoDB Stream

GDYOON 2022. 11. 23. 10:09

DynamoDB Stream이란

DynamoDB 테이블의 변경사항(INSERT, MODIFY, REMOVE)에 대한 정보를 스트림 형태로 처리할 수 있도록 해주는 기능

 

사용 사례

  • DynamoDB 테이블의 데이터를 다른 시스템의 저장소에 기록하여 동기화된 복제본 생성
  • 모바일 앱과 연동된 DynamoDB 테이블의 데이터가 수정되고 이를 캡쳐/저장하여 실시간 사용량을 측정
  • 그룹에 속한 친구가 사진을 새로 업로드하면 애플리케이션에서 그룹에 속한 모든 친구들에게 즉시 알림을 전송
  • 새로운 고객이 생기면 DynamoDB 테이블에 데이터가 추가되고, 해당 이벤트가 발생하면 새 고객에게 환영 이메일을 발송하는 애플리케이션이 호출

 

DynamoDB Stream에 대한 엔드포인트

AWS 에서는 아래 그림과 같이 DynamoDB와 DynamoDB Stream에 대한 엔드포인트가 나누어져 있다.

DynamoDB의 테이블 변경 사항을 캡쳐하려면 동일한 리전의 DynamoDB Stream 엔드포인트에 접근해야 한다.

 

스트림 판독 및 처리

DynamoDB 스트림 판독 및 처리를 설명하기 전에 간단히 용어를 정리한다.

  • 레코드 : DynamoDB 테이블의 단일 데이터 변경 사항(INSERT, MODIFY, REMOVE)을 의미
  • 샤드 : 다중 레코드 저장소 역할을 하며, 레코드 엑세스 및 반복처리에 필요한 정보를 의미

 

샤드는 한시적이며 필요에 따라 자동으로 생성되었다가 삭제된다.

모든 샤드는 여러 개의 새로운 샤드로 분할될 수 있으며, 테이블에서 쓰기 활동이 활발한 경우 여러 샤드로 부터 동시에 레코드를 처리할 수 있도록 샤드가 분할될 수 있다.

 

다음은 스트림, 샤드, 레코드 간 관계에 대한 그림이다.

그림과 같이 DynamoDB Stream은 여러개의 샤드로 구성되어 있고 각 샤드는 여러개의 레코드를 포함한다.

 

DynamoDB Stream의 특징

스트림

  • 스트림은 최대 24시간 저장된다.
  • 스트림은 실시간 그 이후 데이터만을 반환하는 것이 아니라 최근 24시간의 레코드를 리턴한다.
  • 스트림을 얻는(describeStream)은 계정별 1초당 최대 10번 호출 할 수 있다.

샤드

  • 데이터의 변화가 없어도 샤드는 열리고 닫힌다.
  • 이미 닫힌 샤드는 EndingSequenceNumber를 포함한다.
  • 닫힌 샤드의 레코드엔 NextShardIterator가 존재하지 않는다.
  • 닫힌 샤드는 한번만 처리하면 된다.
  • 열려있는 샤드는 여러번 참조하여 레코드의 정보를 얻어와야 한다.
  • 열려있는 샤드는 항상 NextShardIterator가 존재한다.
  • 샤드는 여러 레코드로 이루어져 있으므로 특정 위치로 이동하기 위해서는 SequenceNumber 값을 파라미터로 설정한다.

 

DynamoDB 의 변경 사항 캡쳐하기

DynamoDB 테이블의 변경 사항을 캡쳐하는 방법에 대해 알아본다.

해당 문서는 Python 기준 AWS 공식 SDK boto3 패키지를 사용하여 작성한다.

 

스트림 활성화

DynamoDB 테이블 > 개요 > 스트림 세부 정보 > 스트림 관리 를 선택한다.

그 다음 스트림에 기록될 정보를 선택한다.

  • Keys only(키만 보기) — 수정된 항목의 키 속성만을 표시한다.
  • New image(새 이미지) — 항목의 수정 후 전체 모습을 보여 준다.
  • Old image(이전 이미지) — 항목의 수정 전 전체 모습을 보여  준다.
  • New and old images(새 이미지와 이전 이미지) — 항목의 새 이미지와 이전 이미지를 모두 보여 준다.

Keys only를 선택하는 경우 변경 사항에 대한 레코드의 키값만 가져온다.

DynamoDB에 다시 쿼리 해야 전체 데이터의 정보를 얻어올 수 있지만 잦은 읽기로 사용량 및 요금을 증가시킬 수 있다.

Old image가 포함된 정보는 payload의 낭비가 될 수 있으므로 선택하지 않는다.

New image를 선택하는 경우 키값을 포함한 변경된 이후의 데이터를 보여주므로 해당 항목을 선택한다.

 

스트림 / 샤드 얻어오기

DynamoDB Stream의 API 중 DescribeStream을 사용하여 스트림의 정보를 얻어온다.

다음 샤드의 ID 부터 가져오고자 할 땐, ExclusiveStartShardId를 설정한다. 이에 해당하는 조건은 아래와 같다.

  • 반환할 수 있는 샤드의 개수를 초과한 경우
  • 닫힌 샤드에 대해 참조할 필요가 없는 경우

DescribeStream — 해당 스트림에 대한 세부 정보를 반환한다. 출력된 정보에는 스트림에 연계된 샤드의 목록과 샤드 ID가 포함되어 있다.

 

위의 내용에 대해 아래와 같이 요청해보자.

response = client.describe_stream(
    StreamArn='string',
    Limit=123,
    ExclusiveStartShardId='string'
)

 

다음은 응답 결과이다.

{
    'StreamDescription': {
        'StreamArn': 'string',
        'StreamLabel': 'string',
        'StreamStatus': 'ENABLING'|'ENABLED'|'DISABLING'|'DISABLED',
        'StreamViewType': 'NEW_IMAGE'|'OLD_IMAGE'|'NEW_AND_OLD_IMAGES'|'KEYS_ONLY',
        'CreationRequestDateTime': datetime(2015, 1, 1),
        'TableName': 'string',
        'KeySchema': [
            {
                'AttributeName': 'string',
                'KeyType': 'HASH'|'RANGE'
            },
        ],
        'Shards': [
            {
                'ShardId': 'string',
                'SequenceNumberRange': {
                    'StartingSequenceNumber': 'string',
                    'EndingSequenceNumber': 'string'
                },
                'ParentShardId': 'string'
            },
        ],
        'LastEvaluatedShardId': 'string'
    }
}

 

샤드 이터레이터 얻기

샤드를 얻으면, 샤드 ID를 통해 레코드를 처리해야 하므로 먼저 샤드 이터레이터를 가져온다.

GetShardIterator — 샤드 내 위치를 설명하는 샤드 반복자를 반환한다. 반복자가 스트림의 가장 오래된 지점, 최신 지점 및 특정 지점에 대한 액세스를 제공하도록 요청할 수 있다.

ShardIteratorType에는 여러 옵션이 있지만 최신 데이터만 받을 필요가 있으므로 LATEST 로 설정한다.

응답 결과로 샤드 이터레이터를 얻는다.

 

위의 내용에 대해 아래와 같이 요청해보자.

response = client.get_shard_iterator(
    StreamArn='string',
    ShardId='string',
    ShardIteratorType='TRIM_HORIZON'|'LATEST'|'AT_SEQUENCE_NUMBER'|'AFTER_SEQUENCE_NUMBER',
    SequenceNumber='string'
)

 

다음은 응답 결과이다.

{
    'ShardIterator': 'arn:aws:dynamodb:us-west-2:111122223333:table/Forum/stream/2015-05-20T20:51:10.252|1|AAAAAAAAAAEvJp6D+zaQ...  <remaining characters omitted> ...',
    'ResponseMetadata': {
        '...': '...',
    },
}

 

레코드 처리하기

샤드 이터레이터를 얻으면 레코드를 요청할 때 사용한다.

GetRecords — 해당 샤드 내에서 스트림 레코드를 반환한다. GetShardIterator 요청으로부터 반환된 샤드 반복자를 제공해야 한다.

레코드는 여러개가 될 수 있으므로 다음 샤드 이터레이터를 참조하려면 응답 결과의 NextShardIterator를 get_records() 를 호출 할 때, ShardIterator의 인자로 넣어야 한다.

여기서 레코드의 데이터는 없더라도 위에 설명한바와 같이 열려있는 샤드는 NextShardIterator가 항상 존재한다.

반대로 닫힌 샤드는 NextShardIterator가 없다.

 

다음은 레코드를 얻는 요청 예제이다.

response = client.get_records(
    ShardIterator='string',
    Limit=123
)

 

실제로 레코드의 데이터는 dynamodb 속성 아래에 존재한다.

그리고 위에서 스트림에 기록될 정보에는 New Image를 선택하였으므로 해당 정보만 응답 결과로 받게 된다.

{
    'Records': [
        {
            'eventID': 'string',
            'eventName': 'INSERT'|'MODIFY'|'REMOVE',
            'eventVersion': 'string',
            'eventSource': 'string',
            'awsRegion': 'string',
            'dynamodb': {
                'ApproximateCreationDateTime': datetime(2015, 1, 1),
                'Keys': {
                    'string': {
                        'S': 'string',
                        'N': 'string',
                        'B': b'bytes',
                        'SS': [
                            'string',
                        ],
                        'NS': [
                            'string',
                        ],
                        'BS': [
                            b'bytes',
                        ],
                        'M': {
                            'string': {'... recursive ...'}
                        },
                        'L': [
                            {'... recursive ...'},
                        ],
                        'NULL': True|False,
                        'BOOL': True|False
                    }
                },
                'NewImage': {
                    'string': {
                        'S': 'string',
                        'N': 'string',
                        'B': b'bytes',
                        'SS': [
                            'string',
                        ],
                        'NS': [
                            'string',
                        ],
                        'BS': [
                            b'bytes',
                        ],
                        'M': {
                            'string': {'... recursive ...'}
                        },
                        'L': [
                            {'... recursive ...'},
                        ],
                        'NULL': True|False,
                        'BOOL': True|False
                    }
                },
                'OldImage': {
                    'string': {
                        'S': 'string',
                        'N': 'string',
                        'B': b'bytes',
                        'SS': [
                            'string',
                        ],
                        'NS': [
                            'string',
                        ],
                        'BS': [
                            b'bytes',
                        ],
                        'M': {
                            'string': {'... recursive ...'}
                        },
                        'L': [
                            {'... recursive ...'},
                        ],
                        'NULL': True|False,
                        'BOOL': True|False
                    }
                },
                'SequenceNumber': 'string',
                'SizeBytes': 123,
                'StreamViewType': 'NEW_IMAGE'|'OLD_IMAGE'|'NEW_AND_OLD_IMAGES'|'KEYS_ONLY'
            },
            'userIdentity': {
                'PrincipalId': 'string',
                'Type': 'string'
            }
        },
    ],
    'NextShardIterator': 'string'
}

 

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함