티스토리 뷰

해당 장 목표

  • 이벤트 비유를 확장해 시스템으로 들어오거나 시스템으로 나가는 메시지까지 포용하는 방법
    • 이벤트 비유(Event Metaphor): 이벤트를 사용하여 시스템상의 다양한 컴포넌트들이 결합되지 않도록 하는 것

  • 외부 이벤트를 외부 메세지 버스를 통해 받고, 출력을 이벤트 형태로 외부 메시지 버스에 발행

11.1 분산된 진흙 공, 명사로 생각하기

  • 명사로 나누기: 기존 애플리케이션을 마이그레이션시 본능적으로 하게 되는 첫번째 일
  • 시스템에 도입된 명사 예시: 재고 배치, 주문, 상품, 고객

 

  • ReserveStock: 사용자가 재고가 있는 상품을 선택 후 재고 예약
  • ConfirmReservation: 사용자 예약 확인
  • DispatchGoods: 주문 완료 후 창고에서 출고
  • MakeCustomerVIP: 3번째 주문일 경우 고객을 VIP로 승격

: 데이터베이스 테이블 단위로 마이크로 서비스를 만들고 HTTP API를 CRUD API로 취급하는 방식

→단순한 시스템의 경우엔 잘 동작하지만 점점 의존성 그래프가 지저분해지기 시작

 

11.2 분산시스템에서 오류 처리하기

결합되다(coupled): 두 가지를 함께 바꿔야하는 경우

시간적 결합(temporal coupling): 연쇄적인 실패. 시스템의 모든 부분이 동시에 제대로 작동할 때만 정상으로 작동하는 경우

*동시생산(connascene)

더보기

동시생산이 나쁘지 않은 이유

  1. 명시적 의존성: 동시생산은 코드 요소들이 명시적으로 서로 의존하고 있음을 나타냅니다. 이는 코드의 동작을 명확하게 이해하고 추론하는 데 도움이 됩니다. 결합도 높은 코드보다 오히려 예측 가능성과 유지 보수성이 높을 수 있습니다.
  2. 응집도 향상: 동시생산 관계에 있는 코드 요소들은 서로 긴밀하게 관련되어 있으므로, 이들을 함께 변경해야 하는 이유가 명확할 때 응집도를 높일 수 있습니다. 높은 응집도는 모듈 내의 요소들이 동일한 기능을 중심으로 작동하게 하여 모듈의 책임을 명확히 합니다.
  3. 변경 용이성: 특정 기능을 변경할 때 관련된 요소들을 한꺼번에 변경할 수 있다는 점에서 변경이 용이합니다. 이는 변화의 영향을 국지적으로 제한하고, 전체 시스템에 미치는 영향을 최소화하는 데 기여할 수 있습니다.
  4. 테스트 용이성: 동시생산 관계를 인지하고 있는 경우, 관련된 코드 요소들을 함께 테스트함으로써 버그를 줄일 수 있습니다. 이는 테스트 케이스를 작성하고 유지 보수하는 데 도움이 됩니다.

결론적으로, 동시생산은 코드 요소들 간의 관계를 명확히 하여 시스템의 구조를 이해하고 유지 보수하는 데 긍정적인 영향을 줄 수 있습니다. 중요한 것은 이러한 관계를 인지하고 관리하여 코드의 품질을 유지하는 것입니다.

11.3 대안: 비동기 메시징을 사용한 시간적 결합

도메인 모델: 동사에 대한 모델

→ 주문과 배치가 아닌 주문 행위와 할당 행위에 대한 시스템에 대해 생각해보자

: 다른 모든 일은 언젠가 발생한다는 것만 보장할 수 있다면, 나중에 발생할 수 있다.

마이크로서비스도 애그리게이트와 비슷하게 일관성 경계여야 한다. 즉, 두 서비스 사이엔 최종 일관섬을 받아들일 수 있고 동기화된 호출에 의존하지 않아도 된다는 뜻이다.

 

비동기메시지로 시스템통합

  1. BatchQuantityChanged 메시지가 업스트림 시스템으로부터 외부 메시지로 도착
  2. 시스템을 이벤트를 리슨하는 다운스트림 시스템을 위해 Allocated 이벤트 발행

장점

  • 각 부분이 서로 독립적으로 실패 → 잘못된 동작 발생시 처리가 쉽다
  • 시스템 사이의 결합 강도를 감소시킬 수 있다.

11.4 레디스 발행/구독 채널을 통합에 사용하기

메시지 브로커: 이벤트를 시스템 밖으로 보내고 다른 시스템 안으로 넣는 서비스를 위한 메시지 버스같은 것. 발행자로부터 메시지를 받아서 구독자에게 배달.

  • ex: 이벤트 스토어, 카프카, RabbitMQ, Redis

11.5 엔드 투 엔드 테스트를 사용해 모든 기능 시범 운영하기

import json
import pytest
from tenacity import Retrying, RetryError, stop_after_delay
from . import api_client, redis_client
from ..random_refs import random_batchref, random_orderid, random_sku


@pytest.mark.usefixtures("postgres_db")
@pytest.mark.usefixtures("restart_api")
@pytest.mark.usefixtures("restart_redis_pubsub")
def test_change_batch_quantity_leading_to_reallocation():
    # start with two batches and an order allocated to one of them
    # 주문 라인의 재할당을 불러일으키는 이벤트를 시스템에 보내려고 한다. 
    # 그 다음 레디스에서 이벤트로 재할당이 발생하는지 살펴본다.
    orderid, sku = random_orderid(), random_sku()
    earlier_batch, later_batch = random_batchref("old"), random_batchref("newer")
    # api_client: 두 유형의 테스트 사이에 공유하기 위해 리팩터링된 도우미 함수
    api_client.post_to_add_batch(earlier_batch, sku, qty=10, eta="2011-01-01")
    api_client.post_to_add_batch(later_batch, sku, qty=10, eta="2011-01-02")
    response = api_client.post_to_allocate(orderid, sku, 10)
    assert response.json()["batchref"] == earlier_batch

    subscription = redis_client.subscribe_to("line_allocated")
    
    # redis_client: 여러 레디스 채널에 메시지를 보내거나 여러 채널에서 메시지를 받는 것
    # change quantity on allocated batch so it's less than our order
    redis_client.publish_message(
        "change_batch_quantity",
        {"batchref": earlier_batch, "qty": 5},
    )

    # wait until we see a message saying the order has been reallocated
    # 테스트 대상 시스템이 비동기적이므로 tenacity 라이브러리를 재사용해 다시 시도 루프를 추가
    messages = []
    for attempt in Retrying(stop=stop_after_delay(3), reraise=True):
        with attempt:
            message = subscription.get_message(timeout=1)
            if message:
                messages.append(message)
                print(messages)
            data = json.loads(messages[-1]["data"])
            assert data["orderid"] == orderid
            assert data["batchref"] == later_batch

11.5.1 레디스는 메시지 버스를 감싸는 다른 얇은 어댑터

  • 레디스 발행/구독 리스너는 플라스크와 유사하다. 이벤트 리스너는 외부 세계를 변환해 이벤트로 만든다.
import json
import logging
import redis

from allocation import config
from allocation.domain import commands
from allocation.adapters import orm
from allocation.service_layer import messagebus, unit_of_work

logger = logging.getLogger(__name__)

r = redis.Redis(**config.get_redis_host_and_port())


def main():
		# 시작시 change_batch_quantity 채널 구독
    orm.start_mappers()
    pubsub = r.pubsub(ignore_subscribe_messages=True)
    pubsub.subscribe("change_batch_quantity")

    for m in pubsub.listen():
        handle_change_batch_quantity(m)


def handle_change_batch_quantity(m):
		# 시스템 진입점에서 해야할 일은 JSON을 역직렬화한 객체를 Command로 변환해서 서비스 계층으로 넘기는 일
			#플라스크 어댑터와 비슷
    logging.debug("handling %s", m)
    data = json.loads(m["data"])
    cmd = commands.ChangeBatchQuantity(ref=data["batchref"], qty=data["qty"])
    messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())


if __name__ == "__main__":
    main()

다운스트림 어댑터: 반대방향의 변환(도메인 이벤트를 공개 이벤트로 변환) 처리

import json
import logging
from dataclasses import asdict
import redis

from allocation import config
from allocation.domain import events

logger = logging.getLogger(__name__)

r = redis.Redis(**config.get_redis_host_and_port())


def publish(channel, event: events.Event):
    logging.debug("publishing: channel=%s, event=%s", channel, event)
    r.publish(channel, json.dumps(asdict(event)))

11.5.2 외부로 나가는 새 이벤트

Allocated 이벤트: 할당에 대해 알아야할 필요가 있는 모든 내용 저장. 이를 모델의 allocate() 매서드에 추가

class Product:
    def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
        self.sku = sku
        self.batches = batches
        self.version_number = version_number
        self.events = []  # type: List[events.Event]

    def allocate(self, line: OrderLine) -> str:
        try:
            batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
            batch.allocate(line)
            self.version_number += 1
            self.events.append(
                events.Allocated(
                    orderid=line.orderid,
                    sku=line.sku,
                    qty=line.qty,
                    batchref=batch.reference,
                )
            )
            return batch.reference
        except StopIteration:
            self.events.append(events.OutOfStock(line.sku))
            return None

11.6 내부 이벤트와 외부 이벤트 비교

내부와 외부 이벤트 구분을 명확히 하자.

→ 외부로 나가는 이벤트는 검증을 적용하는 것이 중요하다.

11.7 마치며

이벤트는 외부에서 들어올 수도 있고, 외부로 발행할 수도 있다.

이런 종류의 시간적 결합을 사용하면 애플리케이션 통합시 적당한 유연성을 가질 수 있으나 여기에도 트레이드 오프가 있다.

반응형
댓글