[비동기 이벤트 처리] Inotify + RabbitMQ를 활용한 실시간 파일 감지 및 비동기 처리 파이프라인 구축

2026. 1. 9. 17:06·dev/backend

1. 서론

회사에서 폐쇄망 운영 환경의 SI 사업에 투입됐다.

 

본 사업의 목표는 일 평균 10만건 데이터를 송수신 하는 인터페이스를 구축 및 고도화 하는 것이다. 보안상 상세하게 기재하진 못하지만 간단하게 운영환경을 기술하자면, 총 3대의 서버 (Rocky Linux 9.5) 가 구동된다. 물리적인 서버 3대가 있는 이유는 서버 이중화 와 예비 서버 용도이다. 이 3대 중 한 1대만 Active 상태를 유지하고 해당 서버를 통해 항공 데이터(.txt) 가 송수신된다.

 

데이터 양은 하루 평균 10만건이다. 또한, 데이터 종류가 8가지이고 데이터 종류별로 타깃 단위 시스템에 매핑해서 송수신 해야 하는데 이 단위 시스템 수만 118개이다.... ㄷㄷ 처음엔 막막했지만 다행히 이 중 15개 시스템에만 연계가 되게끔 인터페이스를 구현하면 된다는 얘기를 듣고 조금 괜찮아졌다.

 

개발이 마무리 된 시점에서 연계 시스템을 구축하면서 겪었던 문제와 고민했던 내용을 기록해보고자 블로그에 작성해보고자 한다.

2. 본론

데이터 양을 보고 예전 <GAGU> 사이드 플젝에서 채팅 기능 구현할 때 사용했던 `RabbitMQ` 를 무조건 사용해야겠다고 생각이 들었다. 문제는 '실시간으로 데이터를 어떻게 감지하냐' 였다. 실시간으로 데이터를 감지 하는 내용을 얘기 하기 전에 송수신 과정에서의 요구사항에 대해 이해가 필요할 것 같다.

 

발주처에서 요구하는 송수신 과정은 다음과 같이 이루어 진다.

 1. 시스템 내 FDS (File Delivery System) 에 송신 하고자 하는 파일 삽입 -> Gateway 의 /*/temp 디렉토리로 파일 전송

 2. /temp 디렉토리에서 해당 데이터 바디 값이 MDD 를 준수하였는지 유효성 검증 후 /succ 또는 /fail 폴더로 이동

 3. /succ 디렉토리로 들어오는 데이터만 연계 시스템으로 전송 (.txt 파일 생성)

º 보안상 문제로 System A, System B 로 정의
º System A 를 기준으로 송수신 정의됨

다행히 1번과 2번은 레거시 FDS 시스템에 종속적이여서 바꾸기 힘들다고 해서 /succ 파일에 들어오는 데이터만 문제없이 연계하면 되는 상황이였다.

 

필자가 이러한 요구사항을 구현하기 위한 시나리오는 다음과 같다.

1. 메시지 및 타깃 시스템별 데이터를 담는 Queue 생성

2. /*/succ 로 들어오는 데이터를 실시간으로 감지 (단, 중복으로 데이터를 감지하면 안됨)

3. 실시간으로 감지된 데이터를 정의된 라우팅 키로 Queue 에 삽입 

4. Queue 에 적재된 데이터를 타깃 시스템에서 비동기로 처리 

 

이 시나리오에서의 핵심은 실시간 감지였다..!

그래서 리눅스 환경과 호환이 잘 되는 디렉토리 이벤트 감지 서비스를 부지런히 찾아보았고, 이 과정에서 Polling 감지, Telegraf 사용, Inotify 커널 이벤트 라는 방법들을 직접 사용해보고 비교해보았다.

 

2.1. 신규 데이터 실시간 감지

2.1.1. polling (full-scan) - 성능 이슈 염려하여 기각

일단 가장 간단하게 '구현만' 할 수 있는 방법은 polling 방식이 있을 것 같았다.

주기적으로 `linux date(서버 현재 시각)`과 파일 생성시간을 비교하여 최신 일자일 경우 신규 데이터로 감지하는 것이다.

 

이렇게 되면 스케줄러로 주기적으로 현재 시간과 +- 오차 시간을 적용하여 비교 후 처리 할텐데, 예외 상황이 많을 것 같았다. 스케줄러가 동작하지 않거나 지연이 됐을 때 데이터가 누락되는 현상이 발생할 수 있고, polling 으로 불필요한 조회 연산을 지속적으로 호출하게끔 되어 해당 방법은 소거하였다.

 

 

2.1.2. Telegraf - Queue 삽입까지 너무 오래걸려서 기각

모니터링 시스템 구현 관련 라이브러리를 찾던 중 `Telegraf` + `Grafana` 조합을 많이들 사용한다는 것을 알게 되었고, 이 중 `Telegraf` 에 실시간 감지 기능이 내장돼 있다는 걸 확인했다.

 

`Telegraf`는 `InfluxData`에서 개발하였으며 데이터베이스나 서버의 데이터 (디렉토리도 가능) 를 수집하는 Agent 역할을 한다. Docker hub 에도 공식 이미지가 있고, `telegraf.conf` 라는 설정 파일로 Agent 를 조작하는 느낌이라 설정이 간편했다.

https://hub.docker.com/_/telegraf

 

telegraf - Official Image | Docker Hub

Quick reference Supported tags and respective Dockerfile links Quick reference (cont.) What is telegraf? Telegraf is an open source agent for collecting, processing, aggregating, and writing metrics. Based on a plugin system to enable developers in the com

hub.docker.com

 

테스트 용으로 해당 디렉토리에 새로 생성 + 이동 됐을 때 로그로 바디 값을 출력하는 설정을 주었다.

 

telegraf.conf

[agent]

# 데이터 수집 주기
interval = "10s" 

# 수집된 데이터 처리 주기
flush_interval = "10s"

[[inputs.tail]] # 파일 내용 추적 섹션

# 감시할 파일 경로
files = ["/watch-data/*.txt"]

# 파일 새로 생겼을 때 조작
from_beginning = true

# 파일 안에 있는 값 그대로 사용
data_format = "value"

# 데이터 타입 지정
data_type = "string"
name_override = "txtfile_monitor"

[[outputs.file]]

# 파일 바디 값 출력
files = ["stdout"]
data_format="json"

 

이후 Docker hub 에 기재돼 있는대로 container 구동을 하였다.

docker run -d --name telegraf
  -v {telegraf.conf 실제 경로}:/etc/telegraf/telegraf.conf:ro \
  -v {타겟 디렉토리 경로}:/watch-data \
  telegraf

 

테스트는 다음과 같이 진행했고, 데이터의 바디값이 정상적으로 출력됨을 확인했다.

1. 새로운 txt 생성

2. /test 폴더 mv 명령어로 이동

3. 로그 확인

 

이후 단순 출력하는 부분을 RabbitMQ Queue 로 삽입 되게끔 수정하여 테스트 했을 때, 정상적으로 삽입되어 원하는 기능을 테스트 해볼 수 있었다.

[agent]
interval = "10s"
flush_interval = "10s"
metric_batch_size = 1

[[inputs.tail]]
files = ["/watch-data/*.txt"]
from_beginning = true
data_format = "value"
data_type = "string"
name_override = "txtfile_monitor"

[[outputs.amqp]] # rabbitMQ 로 삽입
brokers = ["amqp://guest:guest@{서버 IP}:5672"]
data_format = "json"
exchange = "test.exchange"
exchange_type = "direct"
routing_key = "test.routing.key"
delivery_mode = "persistent"
exchange_durability = "durable"

 

다만 아쉬운건 이벤트 감지는 즉각적으로 일어나지만, Queue 까지 가기까지의 시간이 너무 오래 걸렸다.

단순히 데이터 1건을 이벤트 감지하고 Queue 에 삽입하는데 10초가 소요됐다.

이유는 `Telegraf` 가 `Node-exporter` 처럼 메트릭 수집 도구 용도로 만들어졌기 때문이다. 어떤 데이터가 들어오든 내부적으로 `Telegraf Metric` 이라는 타입의 데이터로 변환한다.

`데이터 읽기` -> `Telegraf Metric 변환` -> `버퍼에 저장` -> `Output 플러그인에서 다시 JSON으로 변환` -> `전송`

 

단순히 데이터를 감지하고 Queue 에 삽입하는 것이 아닌 앞단에 데이터 변환 과정이 있어 오래 걸린다고 한다..

이 과정을 커스텀 하는 것이 어려워 보였고, 만약 10만건이 동시에 몰렸을 때 수행된다고 한다면 병목현상이 발생할 것 같았다.

 

또 다른 문제는 Queue 에 보낼 때이다. '하나 보낼 때 오래걸리면, 대량을 묶어서 보내면 되지 않나?' 라고 생각이 들 수 있다.

`Telegraf` 는 효율성을 위해 기본적으로 데이터를 Batch 로 묶어 보낸다. 그래서 앞선 `Telegraf.conf` 에서 `metric_batch_size = 1` 로 설정을 할 수 있고 만약 전송 성능을 향상 시키려면 이 값을 높이면 된다.

 

만약 1000개를 한 배치로 보내는데 이 중 100번째 데이터가 문제가 있다고 하면 데이터 정합성 문제가 발생할 수 있겠다 싶었다. 

이러 저러한 이유로 `Telegraf` 도 아쉽게 기각됐다.

 

2.1.3. Linux Inotify - 제일 적합

`Telegraf` 를 떠나보내고.. 지친 상태로 README.md 를 읽던 중 Telegraf 가 Linux-Inotify 기반으로 만들어진 것을 알게 됐다.

https://github.com/influxdata/telegraf/blob/master/plugins/inputs/tail/README.md

 

telegraf/plugins/inputs/tail/README.md at master · influxdata/telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data. - influxdata/telegraf

github.com

(이래서 README.md 를 잘 읽어야.....)

이러한 이유로 Inotify 를 써보게 됐다.

 

`Inotify` 는 리눅스 커널에 내장된 파일 시스템 감시 서브시스템으로 별도의 설치가 필요 없지만, 이를 특정 디렉토리를 감시하게끔 커스텀하고 RabbitMQ 와 연동하기 위해선 `inotify-tools` 패키지를 써야 된다고 한다.

 

Linux 내장 기능이기 때문에, Docker 기본 이미지는 가벼운 Linux 이미지를 사용하였고 관련된 패키지들을 빌드할 때 설치하였다.

 

Dockerfile

# 1. Alpine Linux 기반
FROM alpine:3.18

# 2. inotify-tools, python3, pip 설치
RUN apk add --no-cache inotify-tools python3 py3-pip

# Python 전용 RabbitMQ 통신 프로토콜 라이브러리 패키지
RUN pip3 install pika --break-system-packages

# 이벤트 발생시 RabbitMQ Queue 삽입 로직
COPY publisher.py /publisher.py

# Inotify-tools 스크립트
COPY watch-and-publish.sh /watch-and-publish.sh
RUN chmod +x /watch-and-publish.sh

# 실행
CMD ["/watch-and-publish.sh"]

 

`Dockerfile` 에 있는 `/watch-and-publish.sh` 가 `inotify-tools` 를 사용해 특정 디렉토리에 신규 파일 생성 이벤트를 감지하는 스크립트고, `publisher.py` 가 Linux - RabbitMQ 서버 간 연동을 해주는 파이썬 코드다.

 

watch-and-publish.sh

#!/bin/sh

WATCH_DIR_INSIDE_CONTAINER=${WATCH_DIR:-"/data"}

echo ">>> File Watcher started."
echo ">>> Watching: $WATCH_DIR_INSIDE_CONTAINER"
echo ">>> Target RabbitMQ: $RMQ_HOST (AMQP)"

# WATCH_DIR_INSIDE_CONTAINER 를 관측해라
inotifywait -m -q -e close_write -e moved_to --format '%w%f' "$WATCH_DIR_INSIDE_CONTAINER" | \
python3 -u /publisher.py

여기에서 inotifywait이 특정 디렉토리를 관측하다가, 이동 이벤트를 감지하면 해당 파일 경로를 stdout 하고 파이프 구조를 통해 publisher.py 이 stdin으로 받는 구조다. 결국 python 에는 이벤트가 감지된 파일 경로가 들어간다.

 

publisher.py

import sys
import os
import pika
import json
import time
import select

# RabbitMQ 연결 설정
RMQ_HOST = os.environ.get('RMQ_HOST', 'localhost')
RMQ_PORT = int(os.environ.get('RMQ_PORT', 5672))
RMQ_USER = os.environ.get('RMQ_USER', 'guest')
RMQ_PASS = os.environ.get('RMQ_PASS', 'guest')
EXCHANGE = os.environ.get('RMQ_EXCHANGE', 'file.events')
ROUTING_KEY = os.environ.get('RMQ_ROUTING_KEY', 'file.new')

credentials = pika.PlainCredentials(RMQ_USER, RMQ_PASS)
parameters = pika.ConnectionParameters(host=RMQ_HOST, port=RMQ_PORT, credentials=credentials, heartbeat=60)

print(f">>> Python Publisher connecting to {RMQ_HOST}:{RMQ_PORT} (AMQP)...")

connection = None

# pika 패키지로 RabbitMQ 연결
try:
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct', durable=True)
  
    channel.confirm_delivery()

    print(">>> Connection established. Waiting for input...")

    # stdin 으로 받은 파일명
    while True:
        rlist, _, _ = select.select([sys.stdin], [], [], 1.0)

        if rlist:
            
            line = sys.stdin.readline()
            if not line:
                break
			
            file_path = line.strip()
            if not file_path:
                continue
			
            # 파일을 직접 읽음
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                
                # RabbitMQ 로 보낼 Payload 로 구성
                payload = {
                    "filePath": file_path,
                    "fileContent": content
                }
                body = json.dumps(payload)
				
                # 해당 라우팅 키로 전송
                channel.basic_publish(
                    exchange=EXCHANGE,
                    routing_key=ROUTING_KEY,
                    body=body,
                    properties=pika.BasicProperties(
                        delivery_mode=2,
                        content_type='application/json'
                    )
                )
                print(f"Sent: {file_path}")

            except Exception as e:
                print(f"Error processing file {file_path}: {e}", file=sys.stderr)
        else:
            connection.process_data_events()

except KeyboardInterrupt:
    print("Stopping...")
except Exception as e:
    print(f"Critical Connection Error: {e}", file=sys.stderr)
finally:
    if connection and connection.is_open:
        connection.close()

`publisher.py` 에서 stdin 으로 받은 파일명으로 직접 읽어서 해당 바디값을 페이로드로 Queue 에 삽입하는 구조다.

이렇게 스트립트들을 구성하고 docker container 로 구동시켰다.

 

그리고 발주처에서 제공한 파일들을 mv 명령어로 이동 됐을 때 Queue 에 정상적으로 삽입 되는 걸 확인 할 수 있었다..!

 

2.1.4. 부하 테스트 후 개선

앞서 기술했다시피 꽤 대량의 데이터가 송수신된다. 그렇기 때문에 부하테스트를 해보기로 했고, 발주처에서 정의한 부하 정도는 3천건이라고 얘기하긴 했지만 필자는 발주처에서 제공한 데이터 전부를 넣어보고 부하테스트를 진행하였다.

 

20781 건을 동시에 감시 디렉토리로 이동하는 테스트를 진행했고, 결과는 처참했다ㅋㅋㅋㅋㅋㅋ

 

 

초당 약 148건의 속도로 삽입됐는데 약 2분정도? 소요됐다. 근데 데이터는 2,465 건 누락됐다..ㅎ

지금은 웃으면서 쓰지만 당시에 매우 식은땀이 났다. 이대로 가면 중요한 항공 데이터를 내가 날려먹는 상황이 나온다는 생각에 AI 의 힘을 빌렸다.

 

AI 가 알려준 문제로는 크게 3가지였다.

1. 동기 ACK 대기: 데이터 하나 송신 후 제대로 갔는지 체크하는 대기 시간 발생하여 대량 전송 시 병목현상 발생

2. readLine(): readline()은 한 줄씩 시스템콜을 발생시켜 대량 입력 시 I/O 오버헤드가 큼.

3. max_queued_events: inotify 이벤트 큐에 적재할 수 있는 최대 이벤트 수가 기본 값으로 2만건 수용이 안됨.

 

이를 해결 하기 위해서 다음과 같은 개선 사항을 알려줬다.

 

1. 동기 ACK 대기 제거 (Publisher Confirm 비활성화)

pika 패키지의 confirm_delivery()는 메시지를 1건 보낼 때마다 RabbitMQ broker가 "잘 받았어"라고 응답(ACK)을 줄 때까지 기다리는 기능이다. 비유하자면 택배를 보내는데, 한 박스 보내고 → 상대방이 "받았습니다" 전화할 때까지 기다리고 → 다음 박스 보내고 → 또 전화 기다리고… 이걸 2만번 반복하는 셈이다.

 
# Before: 매 건마다 broker ACK를 동기적으로 기다림
channel.confirm_delivery()

# After: ACK 대기 없이 연속 전송 (fire-and-forget)
# channel.confirm_delivery()

이걸 끄면 택배를 쉬지 않고 연속으로 보내는 것과 같다. 대신 1000건마다 process_data_events()로 broker와 상태를 한번씩 확인하여 연결이 막히지 않게 했다. 참고로 confirm을 꺼도 TCP 연결 위에서 AMQP 프로토콜로 통신하기 때문에 메시지가 broker에 도달하는 것 자체는 보장된다.

 

2. readline() → os.read() 청크 단위 읽기

sys.stdin.readline()은 파이프에서 데이터를 한 줄씩 읽는다. 파일 경로가 2만건 들어오면 시스템콜이 2만번 발생한다.

# Before: readline() - 한 줄씩 읽기
# 파이프에 "file1\nfile2\nfile3\n..." 이 들어오면
#   1번째 시스템콜 → "file1\n" 획득
#   2번째 시스템콜 → "file2\n" 획득
#   3번째 시스템콜 → "file3\n" 획득
#   ... 2만번 반복

line = sys.stdin.readline()
# After: os.read() - 1MB를 한번에 읽기
# 파이프에 "file1\nfile2\nfile3\n..." 이 들어오면
#   1번째 시스템콜 → "file1\nfile2\nfile3\n...file800\n" 한번에 획득
#   직접 \n 기준으로 split해서 처리

chunk = os.read(sys.stdin.fileno(), 1024 * 1024)

마트에서 장바구니 없이 물건 하나 집고 계산하고, 다시 하나 집고 계산하는 것(readline)과, 카트에 한번에 담아서 한번에 계산하는 것(os.read)의 차이다. 시스템콜 횟수가 줄어드니 당연히 빨라진다.

 

3. max_queued_events 파라미터 변경

max_queued_events는 inotify 이벤트 큐에 적재할 수 있는 **최대 이벤트 수(건수)**다. 기본값은 16,384건인데, 2만건을 한번에 mv하면 이 큐가 넘쳐서 초과분 이벤트가 버려진다. 2,465건이 누락된 원인이 바로 이것이다.

# 기본값 확인
cat /proc/sys/fs/inotify/max_queued_events
# 16384

# 영구 적용 (reboot 후에도 유지)
echo "fs.inotify.max_queued_events = 49152" | sudo tee /etc/sysctl.d/99-inotify.conf
sudo sysctl -p /etc/sysctl.d/99-inotify.conf

이 설정은 호스트 커널 레벨 파라미터이므로, Docker 컨테이너 안이 아니라 호스트 서버에서 실행해야 한다. 컨테이너는 호스트 커널을 공유하기 때문에 호스트에서 변경하면 컨테이너에도 적용된다.

 

다시 테스트 해봤을 때 초당 2520건 삽입 되는 걸 확인 할 수 있었고, 데이터 누락없이 10초만에 모두 Queue 에 삽입 된 걸 볼 수 있었다. 대략 2분 -> 10초로 감소된 셈이다!

 

3. 마무리

누군가에겐 간단한 문제였을 수 있지만.. 처음 실시간 연계 파이프라인을 구축해보는 입장에서 꽤 어려웠고, 그만큼 많이 고민하면서 시스템 설계를 하였다. 이런 경험들이 쌓여서 다음 유사한 케이스에선 좀 더 삽질을 덜 해서 더욱 탄탄한 구조를 만들 수 있음 좋겠다!

'dev > backend' 카테고리의 다른 글

[Python] KoNLPy 자연어 형태소 분석  (0) 2025.12.29
[Python] SQLalchemy ORM  (0) 2025.12.24
[Python] BeautifulSoup 크롤링  (0) 2025.12.24
[Python] pip 패키지 목록 자동생성  (0) 2025.12.24
[Python] windows .venv 세팅  (1) 2025.12.22
'dev/backend' 카테고리의 다른 글
  • [Python] KoNLPy 자연어 형태소 분석
  • [Python] SQLalchemy ORM
  • [Python] BeautifulSoup 크롤링
  • [Python] pip 패키지 목록 자동생성
hand-mk
hand-mk
  • hand-mk
    보조기억장치
    hand-mk
  • 전체
    오늘
    어제
    • 분류 전체보기 (27) N
      • 회고록 (2) N
      • 자격증 (1)
        • aws (1)
      • dev (24)
        • se (5)
        • algorithm (6)
        • ai (3)
        • scm (1)
        • backend (9)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

    • Github
  • 공지사항

  • 인기 글

  • 태그

    telegraf
    ubuntu
    KoNLPy
    폐쇄망
    queryDSL
    java
    코테
    exaone3.5
    springboot
    WSL
    워드클라우드
    vectordb
    vmware
    Cloudflare
    docker
    ollama
    leetcode
    codesignal
    python
    linux
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.4
hand-mk
[비동기 이벤트 처리] Inotify + RabbitMQ를 활용한 실시간 파일 감지 및 비동기 처리 파이프라인 구축
상단으로

티스토리툴바