ログ監視・分析とは?セキュリティインシデントを早期発見する方法

ログ監視・分析とは?

ログ監視・分析とは、Webサーバー、アプリケーション、データベース、ネットワーク機器などが出力するログファイルを継続的に監視・分析し、セキュリティインシデントや異常な活動を早期発見する重要なセキュリティ対策です。

現代のサイバー攻撃は高度化・巧妙化しており、攻撃者は発見されないよう長期間潜伏することがあります。そのため、ログの詳細な分析によって、以下のような活動を検知することが重要になっています:

  • 不正ログイン試行
  • SQLインジェクション攻撃
  • 異常なAPIアクセス
  • 内部犯行
  • マルウェア感染
  • データ漏洩
  • 権限昇格攻撃

ログ監視は「デジタル鑑識の基礎」とも言われ、インシデント発生時の原因調査や被害範囲の特定において不可欠な情報源となります。

監視すべきログの種類

1. Webサーバーログ

アクセスログ(Apache/Nginx)

# Apache Combined Log Format
192.168.1.100 - - [10/Oct/2024:13:55:36 +0900] "GET /admin/login.php HTTP/1.1" 200 2326 "https://example.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"

# 監視すべき項目
- IPアドレス
- アクセス時刻
- リクエストURL
- HTTPステータスコード
- User-Agent
- リファラー

エラーログ

# Apache Error Log
[Mon Oct 10 13:55:36.123456 2024] [php7:error] [pid 12345] [client 192.168.1.100:54321] PHP Fatal error: Uncaught Error: Call to undefined function mysql_connect()

# 重要なエラーパターン
- PHP Fatal Error
- SQL接続エラー
- Permission denied
- File not found(404エラーの大量発生)

2. アプリケーションログ

認証ログ

// 成功例
{
  "timestamp": "2024-10-10T13:55:36Z",
  "level": "INFO",
  "event": "user_login_success",
  "user_id": "12345",
  "username": "john.doe",
  "ip_address": "192.168.1.100",
  "user_agent": "Mozilla/5.0..."
}

// 失敗例(要注意)
{
  "timestamp": "2024-10-10T13:55:36Z",
  "level": "WARNING",
  "event": "user_login_failed",
  "username": "admin",
  "ip_address": "10.0.0.1",
  "failure_reason": "invalid_password",
  "attempt_count": 5
}

SQLクエリログ

-- 正常なクエリ
SELECT * FROM users WHERE id = ?

-- 疑わしいクエリ(SQLインジェクション)
SELECT * FROM users WHERE id = '1' OR '1'='1' --
SELECT * FROM users WHERE id = '1'; DROP TABLE users; --

3. システムログ(Linux/Windows)

Linux(rsyslog/journald)

# /var/log/auth.log - 認証関連
Oct 10 13:55:36 server sshd[12345]: Failed password for root from 192.168.1.100 port 22 ssh2
Oct 10 13:55:40 server sshd[12346]: Accepted password for user from 192.168.1.101 port 22 ssh2

# /var/log/syslog - システム全般
Oct 10 13:55:36 server kernel: [12345.678901] iptables: DROP IN=eth0 OUT= SRC=192.168.1.100 DST=192.168.1.1 PROTO=TCP DPT=22

Windows(Event Log)

# セキュリティログ(Event ID)
4624: ログオン成功
4625: ログオン失敗
4634: ログオフ
4648: 明示的な資格情報を使用したログオン
4672: 特権の割り当て

4. データベースログ

MySQL

# General Log
2024-10-10T13:55:36.123456Z 12345 Query SELECT * FROM users WHERE email = 'user@example.com'

# Error Log
2024-10-10T13:55:36.123456Z 12345 [Warning] Access denied for user 'root'@'192.168.1.100' (using password: YES)

# Slow Query Log(異常に遅いクエリ)
# Time: 2024-10-10T13:55:36.123456Z
# User@Host: app_user[app_user] @ [192.168.1.100]
# Query_time: 30.123456  Lock_time: 0.000000 Rows_sent: 1000000  Rows_examined: 1000000
SELECT * FROM large_table WHERE condition_without_index = 'value';

5. ファイアウォール・IDS/IPSログ

# iptables
Oct 10 13:55:36 firewall kernel: IPTABLES-DROP: IN=eth0 OUT= SRC=10.0.0.1 DST=192.168.1.100 PROTO=TCP DPT=22 SPT=12345

# Suricata IDS
[10/10/2024-13:55:36.123456] [1] [1:2001219:20] ET SCAN Potential SSH Scan [Classification: Attempted Information Leak] [Priority: 2] {TCP} 10.0.0.1:12345 -> 192.168.1.100:22

攻撃パターンの検知

1. ブルートフォース攻撃の検知

# 同一IPからの連続ログイン失敗
grep "Failed password" /var/log/auth.log | awk '{print $11}' | sort | uniq -c | sort -nr

# 結果例(要注意)
15 192.168.1.100
12 10.0.0.1
3 192.168.1.101

Pythonでの自動検知スクリプト

import re
import time
from collections import defaultdict, deque
from datetime import datetime, timedelta

class BruteForceDetector:
    def __init__(self, threshold=5, time_window=300):  # 5分間で5回失敗
        self.threshold = threshold
        self.time_window = time_window
        self.failed_attempts = defaultdict(deque)
        self.blocked_ips = set()

    def process_log_line(self, line):
        # ログイン失敗のパターンを検出
        pattern = r'Failed password for (\w+) from ([\d.]+)'
        match = re.search(pattern, line)

        if match:
            username, ip = match.groups()
            current_time = time.time()

            # 古い記録を削除
            while (self.failed_attempts[ip] and 
                   current_time - self.failed_attempts[ip][0] > self.time_window):
                self.failed_attempts[ip].popleft()

            # 新しい失敗を記録
            self.failed_attempts[ip].append(current_time)

            # 閾値を超えたかチェック
            if len(self.failed_attempts[ip]) >= self.threshold:
                if ip not in self.blocked_ips:
                    self.alert_brute_force(ip, username)
                    self.blocked_ips.add(ip)

    def alert_brute_force(self, ip, username):
        print(f"ALERT: Brute force attack detected from {ip} targeting user {username}")
        # ここでアラート送信、IPブロック等の処理を実行
        self.block_ip(ip)

    def block_ip(self, ip):
        # iptablesでIPをブロック
        import subprocess
        subprocess.run(['iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP'])
        print(f"IP {ip} has been blocked")

# 使用例
detector = BruteForceDetector()

# ログファイルをリアルタイム監視
import subprocess
import sys

def tail_f(filename):
    """tail -f のPython実装"""
    with open(filename, 'r') as f:
        f.seek(0, 2)  # ファイル末尾に移動
        while True:
            line = f.readline()
            if line:
                yield line.strip()
            else:
                time.sleep(0.1)

# ログ監視の開始
for line in tail_f('/var/log/auth.log'):
    detector.process_log_line(line)

2. SQLインジェクション攻撃の検知

import re

class SQLInjectionDetector:
    def __init__(self):
        # SQLインジェクションの典型的なパターン
        self.sqli_patterns = [
            r"(\b(union|select|insert|update|delete|drop|create|alter|exec|execute)\b.*){2,}",
            r"(\'|\%27)(\s)*((\%6f)|o|(\%4f))((\%72)|r|(\%52))",  # 'or
            r"(\'|\%27)(\s)*\d+(\s)*(\%3d|\=)(\s)*\d+",  # '1=1
            r"(\'|\%27)(\s)*\d+(\s)*(\-\-|\%2d\%2d)",    # '1--
            r"\bunion\b.+\bselect\b",                     # union select
            r"\b(and|or)\b\s+\d+\s*=\s*\d+",            # and 1=1
            r"(\'|\%27).*(sleep|benchmark|waitfor)",      # time-based
            r"(\'|\%27).*(concat|char|ascii)",           # function-based
        ]

        self.compiled_patterns = [re.compile(pattern, re.IGNORECASE) for pattern in self.sqli_patterns]

    def detect_sqli(self, query_string):
        for pattern in self.compiled_patterns:
            if pattern.search(query_string):
                return True
        return False

    def analyze_access_log(self, log_line):
        # Apache/Nginxのアクセスログから疑わしいクエリを検出
        parts = log_line.split('"')
        if len(parts) >= 2:
            request = parts[1]  # "GET /path?param=value HTTP/1.1"

            if self.detect_sqli(request):
                ip = log_line.split()[0]
                self.alert_sqli_attempt(ip, request)

    def alert_sqli_attempt(self, ip, request):
        print(f"SQL Injection attempt detected from {ip}: {request}")
        # アラート送信処理

# 使用例
detector = SQLInjectionDetector()

# テストケース
test_queries = [
    "GET /search?q=test",  # 正常
    "GET /search?q=' OR '1'='1",  # SQLインジェクション
    "GET /user?id=1 UNION SELECT password FROM users",  # SQLインジェクション
]

for query in test_queries:
    if detector.detect_sqli(query):
        print(f"DETECTED: {query}")

3. 異常なAPIアクセスパターン

import time
from collections import defaultdict

class APIAbuseDetector:
    def __init__(self):
        self.request_counts = defaultdict(list)
        self.rate_limits = {
            '/api/login': (5, 300),      # 5回/5分
            '/api/search': (100, 60),    # 100回/1分
            '/api/data': (1000, 3600),   # 1000回/1時間
        }

    def check_rate_limit(self, ip, endpoint):
        if endpoint not in self.rate_limits:
            return True

        max_requests, time_window = self.rate_limits[endpoint]
        current_time = time.time()

        # 古いリクエストを削除
        self.request_counts[f"{ip}:{endpoint}"] = [
            req_time for req_time in self.request_counts[f"{ip}:{endpoint}"]
            if current_time - req_time < time_window
        ]

        # 新しいリクエストを追加
        self.request_counts[f"{ip}:{endpoint}"].append(current_time)

        # レート制限チェック
        if len(self.request_counts[f"{ip}:{endpoint}"]) > max_requests:
            self.alert_rate_limit_exceeded(ip, endpoint)
            return False

        return True

    def alert_rate_limit_exceeded(self, ip, endpoint):
        print(f"Rate limit exceeded: {ip} accessing {endpoint}")
        # 一時的なIPブロック処理

# ログ解析での使用
detector = APIAbuseDetector()

def analyze_api_access(log_line):
    # ログからIP、エンドポイントを抽出
    parts = log_line.split()
    ip = parts[0]
    request = parts[6] if len(parts) > 6 else ""

    # エンドポイントを抽出
    if '/api/' in request:
        endpoint = request.split('?')[0]  # クエリパラメータを除去

        if not detector.check_rate_limit(ip, endpoint):
            print(f"Blocking excessive requests from {ip} to {endpoint}")

ログ分析ツールと実装

1. ELK Stack(Elasticsearch, Logstash, Kibana)

Logstash設定(logstash.conf)

input {
  file {
    path => "/var/log/nginx/access.log"
    type => "nginx_access"
    start_position => "beginning"
  }

  file {
    path => "/var/log/nginx/error.log"
    type => "nginx_error"
    start_position => "beginning"
  }
}

filter {
  if [type] == "nginx_access" {
    grok {
      match => { "message" => "%{NGINXACCESS}" }
    }

    geoip {
      source => "clientip"
      target => "geoip"
    }

    # 疑わしいパターンの検出
    if [request] =~ /(union|select|insert|update|delete|drop)/i {
      mutate {
        add_tag => ["suspicious_sql"]
      }
    }

    if [agent] =~ /(bot|crawler|scanner)/i {
      mutate {
        add_tag => ["automated_request"]
      }
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }
}

Kibana ダッシュボードでの可視化

{
  "dashboard": {
    "title": "Security Monitoring Dashboard",
    "panels": [
      {
        "type": "line_chart",
        "title": "Failed Login Attempts",
        "query": "event:user_login_failed",
        "time_field": "@timestamp"
      },
      {
        "type": "pie_chart", 
        "title": "Top Source IPs",
        "query": "tags:suspicious_sql",
        "field": "clientip"
      },
      {
        "type": "data_table",
        "title": "Recent Security Events",
        "query": "tags:(suspicious_sql OR brute_force)",
        "columns": ["@timestamp", "clientip", "request", "response"]
      }
    ]
  }
}

2. Splunk クエリ例

# ブルートフォース攻撃の検出
index=auth sourcetype=linux_secure "Failed password"
| rex field=_raw "Failed password for (?<user>\S+) from (?<src_ip>\S+)"
| stats count by src_ip, user
| where count > 5
| sort -count

# SQLインジェクション検出
index=web sourcetype=access_combined
| regex _raw="(?i)(union|select|insert|update|delete|drop|create|alter)"
| eval sqli_pattern=case(
    match(_raw, "(?i)union.*select"), "Union-based",
    match(_raw, "(?i)'\s*or\s*'"), "OR-based",
    match(_raw, "(?i)'\s*and\s*'"), "AND-based",
    1=1, "Other"
)
| stats count by src_ip, sqli_pattern
| sort -count

# 異常な404エラーの検出(ディレクトリ探索)
index=web status=404
| stats count by src_ip, uri_path
| where count > 10
| sort -count

3. 自作ログ分析システム

import json
import re
import sqlite3
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText

class SecurityLogAnalyzer:
    def __init__(self, db_path="security_logs.db"):
        self.db_path = db_path
        self.init_database()

        # 検知ルール
        self.rules = {
            'brute_force': {
                'pattern': r'Failed password',
                'threshold': 5,
                'time_window': 300
            },
            'sqli_attempt': {
                'pattern': r'(?i)(union|select|insert|update|delete|drop).*?(union|select|insert|update|delete|drop)',
                'threshold': 1,
                'time_window': 1
            },
            'directory_traversal': {
                'pattern': r'\.\./',
                'threshold': 3,
                'time_window': 60
            }
        }

    def init_database(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS security_events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME,
                source_ip TEXT,
                event_type TEXT,
                severity TEXT,
                details TEXT,
                raw_log TEXT
            )
        ''')

        conn.commit()
        conn.close()

    def analyze_log_line(self, log_line):
        timestamp = datetime.now()
        source_ip = self.extract_ip(log_line)

        # 各ルールで検査
        for rule_name, rule_config in self.rules.items():
            if re.search(rule_config['pattern'], log_line):
                self.record_event(timestamp, source_ip, rule_name, log_line)
                self.check_threshold(source_ip, rule_name, rule_config)

    def extract_ip(self, log_line):
        ip_pattern = r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b'
        match = re.search(ip_pattern, log_line)
        return match.group(1) if match else 'unknown'

    def record_event(self, timestamp, source_ip, event_type, raw_log):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            INSERT INTO security_events 
            (timestamp, source_ip, event_type, severity, raw_log)
            VALUES (?, ?, ?, ?, ?)
        ''', (timestamp, source_ip, event_type, 'medium', raw_log))

        conn.commit()
        conn.close()

    def check_threshold(self, source_ip, event_type, rule_config):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # 指定時間内のイベント数をカウント
        since_time = datetime.now() - timedelta(seconds=rule_config['time_window'])

        cursor.execute('''
            SELECT COUNT(*) FROM security_events 
            WHERE source_ip = ? AND event_type = ? AND timestamp > ?
        ''', (source_ip, event_type, since_time))

        count = cursor.fetchone()[0]
        conn.close()

        if count >= rule_config['threshold']:
            self.trigger_alert(source_ip, event_type, count)

    def trigger_alert(self, source_ip, event_type, count):
        alert_message = f"""
        SECURITY ALERT

        Event Type: {event_type}
        Source IP: {source_ip}
        Event Count: {count}
        Time: {datetime.now()}

        Immediate action may be required.
        """

        print(alert_message)
        self.send_email_alert(alert_message)
        self.auto_block_ip(source_ip)

    def send_email_alert(self, message):
        # メールアラートの送信
        try:
            msg = MIMEText(message)
            msg['Subject'] = 'Security Alert'
            msg['From'] = 'security@example.com'
            msg['To'] = 'admin@example.com'

            server = smtplib.SMTP('localhost')
            server.send_message(msg)
            server.quit()
        except Exception as e:
            print(f"Failed to send email alert: {e}")

    def auto_block_ip(self, ip):
        # 自動IPブロック
        import subprocess
        try:
            subprocess.run(['iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP'], 
                         check=True)
            print(f"Automatically blocked IP: {ip}")
        except subprocess.CalledProcessError as e:
            print(f"Failed to block IP {ip}: {e}")

    def generate_report(self, hours=24):
        """セキュリティレポートの生成"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        since_time = datetime.now() - timedelta(hours=hours)

        cursor.execute('''
            SELECT event_type, COUNT(*) as count 
            FROM security_events 
            WHERE timestamp > ? 
            GROUP BY event_type 
            ORDER BY count DESC
        ''', (since_time,))

        events = cursor.fetchall()

        cursor.execute('''
            SELECT source_ip, COUNT(*) as count 
            FROM security_events 
            WHERE timestamp > ? 
            GROUP BY source_ip 
            ORDER BY count DESC 
            LIMIT 10
        ''', (since_time,))

        top_ips = cursor.fetchall()
        conn.close()

        report = f"""
        Security Report - Last {hours} hours

        Event Summary:
        {chr(10).join([f"  {event[0]}: {event[1]}" for event in events])}

        Top Source IPs:
        {chr(10).join([f"  {ip[0]}: {ip[1]} events" for ip in top_ips])}
        """

        return report

# 使用例
analyzer = SecurityLogAnalyzer()

# ログファイルの監視
def monitor_logs():
    import subprocess

    process = subprocess.Popen(['tail', '-F', '/var/log/nginx/access.log'], 
                             stdout=subprocess.PIPE, text=True)

    for line in process.stdout:
        analyzer.analyze_log_line(line.strip())

# レポート生成
print(analyzer.generate_report(24))

リアルタイム監視とアラート

1. Prometheus + Grafana

# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'log-exporter'
    static_configs:
      - targets: ['localhost:9116']

rule_files:
  - "security_rules.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093
# security_rules.yml
groups:
- name: security_alerts
  rules:
  - alert: HighFailedLoginRate
    expr: rate(failed_login_total[5m]) > 0.1
    for: 1m
    labels:
      severity: warning
    annotations:
      summary: "High failed login rate detected"
      description: "Failed login rate is {{ $value }} per second"

  - alert: SQLInjectionAttempt
    expr: increase(sqli_attempts_total[1m]) > 0
    for: 0s
    labels:
      severity: critical
    annotations:
      summary: "SQL injection attempt detected"
      description: "{{ $value }} SQL injection attempts in the last minute"

2. Slack/Discord通知

import requests
import json

class AlertNotifier:
    def __init__(self, slack_webhook=None, discord_webhook=None):
        self.slack_webhook = slack_webhook
        self.discord_webhook = discord_webhook

    def send_slack_alert(self, message, severity='medium'):
        if not self.slack_webhook:
            return

        color_map = {
            'low': '#36a64f',      # green
            'medium': '#ff9800',   # orange  
            'high': '#f44336',     # red
            'critical': '#9c27b0'  # purple
        }

        payload = {
            "attachments": [
                {
                    "color": color_map.get(severity, '#ff9800'),
                    "title": f"🚨 Security Alert - {severity.upper()}",
                    "text": message,
                    "ts": int(time.time())
                }
            ]
        }

        requests.post(self.slack_webhook, json=payload)

    def send_discord_alert(self, message, severity='medium'):
        if not self.discord_webhook:
            return

        color_map = {
            'low': 0x36a64f,      # green
            'medium': 0xff9800,   # orange
            'high': 0xf44336,     # red
            'critical': 0x9c27b0  # purple
        }

        payload = {
            "embeds": [
                {
                    "title": f"🚨 Security Alert - {severity.upper()}",
                    "description": message,
                    "color": color_map.get(severity, 0xff9800),
                    "timestamp": datetime.now().isoformat()
                }
            ]
        }

        requests.post(self.discord_webhook, json=payload)

# 使用例
notifier = AlertNotifier(
    slack_webhook="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
    discord_webhook="https://discord.com/api/webhooks/YOUR/DISCORD/WEBHOOK"
)

def security_alert(event_type, source_ip, details):
    message = f"""
    **Event Type:** {event_type}
    **Source IP:** {source_ip}
    **Details:** {details}
    **Time:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    """

    severity = 'critical' if event_type == 'sqli_attempt' else 'medium'

    notifier.send_slack_alert(message, severity)
    notifier.send_discord_alert(message, severity)

ログローテーションとストレージ

1. logrotate設定

# /etc/logrotate.d/security-logs
/var/log/security/*.log {
    daily
    rotate 365
    compress
    delaycompress
    missingok
    notifempty
    create 644 root root
    postrotate
        /usr/bin/systemctl reload rsyslog > /dev/null 2>&1 || true
    endrotate
}

# アプリケーションログ
/var/log/app/*.log {
    weekly
    rotate 52
    compress
    delaycompress
    copytruncate
    missingok
    notifempty
}

2. 長期保存とアーカイブ

import gzip
import os
import shutil
from datetime import datetime, timedelta

class LogArchiver:
    def __init__(self, log_dir="/var/log", archive_dir="/var/log/archive"):
        self.log_dir = log_dir
        self.archive_dir = archive_dir
        os.makedirs(archive_dir, exist_ok=True)

    def archive_old_logs(self, days_old=30):
        """指定日数より古いログをアーカイブ"""
        cutoff_date = datetime.now() - timedelta(days=days_old)

        for root, dirs, files in os.walk(self.log_dir):
            for file in files:
                if file.endswith('.log'):
                    file_path = os.path.join(root, file)
                    file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))

                    if file_mtime < cutoff_date:
                        self.compress_and_move(file_path)

    def compress_and_move(self, file_path):
        """ログファイルを圧縮してアーカイブディレクトリに移動"""
        filename = os.path.basename(file_path)
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        archive_filename = f"{timestamp}_{filename}.gz"
        archive_path = os.path.join(self.archive_dir, archive_filename)

        with open(file_path, 'rb') as f_in:
            with gzip.open(archive_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)

        os.remove(file_path)
        print(f"Archived: {file_path} -> {archive_path}")

# 使用例
archiver = LogArchiver()
archiver.archive_old_logs(30)  # 30日より古いログをアーカイブ

ログ監視システムの構成

実際のインシデント分析例

ケーススタディ:SQLインジェクション攻撃の発見

# 1. 異常なアクセスパターンの発見
grep -E "(union|select)" /var/log/nginx/access.log | head -5

192.168.1.100 - - [10/Oct/2024:14:30:15 +0900] "GET /search?q=' UNION SELECT password FROM users-- HTTP/1.1" 200 1234
192.168.1.100 - - [10/Oct/2024:14:30:18 +0900] "GET /user?id=1' OR '1'='1 HTTP/1.1" 200 5678
192.168.1.100 - - [10/Oct/2024:14:30:20 +0900] "GET /admin?user=admin'-- HTTP/1.1" 403 234

# 2. 攻撃者の活動期間を特定
grep "192.168.1.100" /var/log/nginx/access.log | head -1 && grep "192.168.1.100" /var/log/nginx/access.log | tail -1

# 3. 影響を受けた可能性のあるページを特定
grep "192.168.1.100" /var/log/nginx/access.log | grep -E "(200|302)" | awk '{print $7}' | sort | uniq

# 4. データベースログで実際のクエリを確認
grep "SELECT.*FROM users" /var/log/mysql/mysql.log

# 5. アプリケーションログでエラーを確認
grep -i "sql.*error" /var/log/app/application.log

被害範囲の特定

def analyze_sqli_incident():
    """SQLインジェクションインシデントの分析"""

    # 1. 攻撃者のIPアドレスから全活動を抽出
    attacker_ip = "192.168.1.100"

    # 2. アクセスしたページを時系列で整理
    with open('/var/log/nginx/access.log', 'r') as f:
        attacker_requests = []
        for line in f:
            if attacker_ip in line:
                # タイムスタンプ、リクエスト、レスポンスコードを抽出
                parts = line.split(' ')
                timestamp = parts[3][1:]  # [を除去
                request = parts[6]
                status_code = parts[8]

                attacker_requests.append({
                    'timestamp': timestamp,
                    'request': request,
                    'status': status_code
                })

    # 3. 成功した攻撃(200, 302レスポンス)を特定
    successful_attacks = [req for req in attacker_requests 
                         if req['status'] in ['200', '302']]

    # 4. 影響を受けたデータを特定
    affected_tables = []
    for attack in successful_attacks:
        if 'users' in attack['request'].lower():
            affected_tables.append('users')
        if 'admin' in attack['request'].lower():
            affected_tables.append('admin')

    # 5. 報告書作成
    report = f"""
    SQLインジェクション攻撃分析報告書

    攻撃者IP: {attacker_ip}
    攻撃期間: {attacker_requests[0]['timestamp']} - {attacker_requests[-1]['timestamp']}
    総リクエスト数: {len(attacker_requests)}
    成功した攻撃: {len(successful_attacks)}
    影響を受けたテーブル: {list(set(affected_tables))}

    推奨対策:
    1. 該当IPアドレスのブロック
    2. 影響を受けたテーブルのデータ整合性確認
    3. アプリケーションのSQLインジェクション対策強化
    4. ログ監視ルールの強化
    """

    return report

print(analyze_sqli_incident())

まとめ

ログ監視・分析は、現代のサイバーセキュリティにおいて不可欠な要素です。

重要なポイント:

  1. 包括的なログ収集:Webサーバー、アプリケーション、DB、システムログ
  2. リアルタイム分析:自動化された攻撃パターン検知
  3. 適切なアラート:閾値設定と迅速な通知システム
  4. 長期保存:インシデント調査とコンプライアンス対応
  5. 継続的改善:検知ルールの調整と精度向上

特に重要なのは、ログ監視を自動化し、人的リソースに依存しない仕組みを構築することです。攻撃者は24時間365日活動するため、監視システムも同様に稼働し続ける必要があります。

また、ログ分析は「事後対応だけでなく予防」にも活用できます。攻撃の前兆を検知し、事前に対策を講じることで、被害を最小限に抑えることが可能です。

この記事で紹介した手法を参考に、効果的なログ監視・分析システムを構築し、セキュリティレベルの向上を図ってください。


コメント

タイトルとURLをコピーしました