Agent skill

incident-reporting

Construction safety incident reporting and analysis. Capture incidents, conduct investigations, track corrective actions, and analyze trends for prevention.

Stars 163
Forks 31

Install this agent skill to your Project

npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/incident-reporting

SKILL.md

Incident Reporting System

Overview

Comprehensive incident reporting system for construction safety. Capture near-misses, injuries, and property damage. Conduct root cause analysis and track corrective actions to prevent recurrence.

"Near-miss reporting prevents 90% of future serious incidents" — DDC Community

Incident Pyramid

                    △
                   /│\        Fatality (1)
                  / │ \
                 /  │  \      Serious Injury (10)
                /   │   \
               /    │    \    Minor Injury (30)
              /     │     \
             /      │      \  Near Miss (300)
            /       │       \
           /        │        \ Unsafe Acts (3000)
          ──────────┴──────────

Technical Implementation

python
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime, timedelta
import json

class IncidentType(Enum):
    NEAR_MISS = "near_miss"
    FIRST_AID = "first_aid"
    MEDICAL_TREATMENT = "medical_treatment"
    LOST_TIME = "lost_time"
    FATALITY = "fatality"
    PROPERTY_DAMAGE = "property_damage"
    ENVIRONMENTAL = "environmental"

class IncidentCategory(Enum):
    FALL = "fall"
    STRUCK_BY = "struck_by"
    CAUGHT_IN = "caught_in"
    ELECTROCUTION = "electrocution"
    VEHICLE = "vehicle"
    MATERIAL_HANDLING = "material_handling"
    TOOL_EQUIPMENT = "tool_equipment"
    SLIP_TRIP = "slip_trip"
    FIRE = "fire"
    CHEMICAL = "chemical"
    OTHER = "other"

class InvestigationStatus(Enum):
    REPORTED = "reported"
    UNDER_INVESTIGATION = "under_investigation"
    ROOT_CAUSE_IDENTIFIED = "root_cause_identified"
    CORRECTIVE_ACTIONS_ASSIGNED = "corrective_actions_assigned"
    IN_REMEDIATION = "in_remediation"
    CLOSED = "closed"

@dataclass
class Person:
    name: str
    company: str
    role: str
    contact: str
    years_experience: int = 0

@dataclass
class CorrectiveAction:
    id: str
    description: str
    assigned_to: str
    due_date: datetime
    status: str = "open"
    completed_date: Optional[datetime] = None
    verification_notes: str = ""

@dataclass
class Incident:
    id: str
    incident_type: IncidentType
    category: IncidentCategory
    date_time: datetime
    location: str
    project_id: str
    project_name: str

    # Description
    description: str
    immediate_actions: str

    # People involved
    injured_person: Optional[Person] = None
    witnesses: List[Person] = field(default_factory=list)
    reported_by: str = ""

    # Investigation
    status: InvestigationStatus = InvestigationStatus.REPORTED
    root_causes: List[str] = field(default_factory=list)
    contributing_factors: List[str] = field(default_factory=list)
    corrective_actions: List[CorrectiveAction] = field(default_factory=list)

    # Documentation
    photos: List[str] = field(default_factory=list)
    weather_conditions: str = ""
    equipment_involved: List[str] = field(default_factory=list)

    # Metrics
    days_lost: int = 0
    property_damage_cost: float = 0.0
    osha_recordable: bool = False

class IncidentManager:
    """Manage construction incident reporting and investigation."""

    # 5 Whys root cause categories
    ROOT_CAUSE_CATEGORIES = [
        "Training/Competency",
        "Procedures/Work Instructions",
        "Equipment/Tools",
        "Supervision",
        "Communication",
        "Housekeeping",
        "PPE",
        "Work Environment",
        "Physical/Mental State",
        "Management System"
    ]

    def __init__(self):
        self.incidents: Dict[str, Incident] = {}
        self.corrective_actions: Dict[str, CorrectiveAction] = {}

    def report_incident(self, incident_type: IncidentType,
                       category: IncidentCategory,
                       date_time: datetime,
                       location: str,
                       project_id: str,
                       project_name: str,
                       description: str,
                       immediate_actions: str,
                       reported_by: str,
                       injured_person: Dict = None) -> Incident:
        """Report new incident."""
        incident_id = f"INC-{datetime.now().strftime('%Y%m%d%H%M%S')}"

        injured = None
        if injured_person:
            injured = Person(**injured_person)

        incident = Incident(
            id=incident_id,
            incident_type=incident_type,
            category=category,
            date_time=date_time,
            location=location,
            project_id=project_id,
            project_name=project_name,
            description=description,
            immediate_actions=immediate_actions,
            reported_by=reported_by,
            injured_person=injured
        )

        # Auto-flag OSHA recordable
        if incident_type in [IncidentType.MEDICAL_TREATMENT,
                            IncidentType.LOST_TIME,
                            IncidentType.FATALITY]:
            incident.osha_recordable = True

        self.incidents[incident_id] = incident
        return incident

    def add_witness(self, incident_id: str, witness: Dict) -> Incident:
        """Add witness to incident."""
        if incident_id not in self.incidents:
            raise ValueError(f"Incident {incident_id} not found")

        self.incidents[incident_id].witnesses.append(Person(**witness))
        return self.incidents[incident_id]

    def conduct_investigation(self, incident_id: str,
                             root_causes: List[str],
                             contributing_factors: List[str]) -> Incident:
        """Record investigation findings."""
        if incident_id not in self.incidents:
            raise ValueError(f"Incident {incident_id} not found")

        incident = self.incidents[incident_id]
        incident.root_causes = root_causes
        incident.contributing_factors = contributing_factors
        incident.status = InvestigationStatus.ROOT_CAUSE_IDENTIFIED
        return incident

    def five_whys_analysis(self, incident_id: str, whys: List[str]) -> Dict:
        """Conduct 5 Whys analysis."""
        if incident_id not in self.incidents:
            raise ValueError(f"Incident {incident_id} not found")

        analysis = {
            "incident_id": incident_id,
            "analysis_date": datetime.now().isoformat(),
            "whys": []
        }

        for i, why in enumerate(whys):
            analysis["whys"].append({
                "level": i + 1,
                "question": f"Why #{i+1}?",
                "answer": why
            })

        # The last "why" is typically the root cause
        if whys:
            self.incidents[incident_id].root_causes.append(whys[-1])

        return analysis

    def assign_corrective_action(self, incident_id: str,
                                description: str,
                                assigned_to: str,
                                due_days: int = 7) -> CorrectiveAction:
        """Assign corrective action."""
        if incident_id not in self.incidents:
            raise ValueError(f"Incident {incident_id} not found")

        action_id = f"CA-{datetime.now().strftime('%Y%m%d%H%M%S')}"
        action = CorrectiveAction(
            id=action_id,
            description=description,
            assigned_to=assigned_to,
            due_date=datetime.now() + timedelta(days=due_days)
        )

        self.incidents[incident_id].corrective_actions.append(action)
        self.incidents[incident_id].status = InvestigationStatus.CORRECTIVE_ACTIONS_ASSIGNED
        self.corrective_actions[action_id] = action
        return action

    def complete_corrective_action(self, action_id: str,
                                   verification_notes: str) -> CorrectiveAction:
        """Mark corrective action complete."""
        if action_id not in self.corrective_actions:
            raise ValueError(f"Corrective action {action_id} not found")

        action = self.corrective_actions[action_id]
        action.status = "completed"
        action.completed_date = datetime.now()
        action.verification_notes = verification_notes
        return action

    def get_incident_metrics(self, project_id: str = None,
                            start_date: datetime = None,
                            end_date: datetime = None) -> Dict:
        """Calculate incident metrics."""
        incidents = list(self.incidents.values())

        if project_id:
            incidents = [i for i in incidents if i.project_id == project_id]
        if start_date:
            incidents = [i for i in incidents if i.date_time >= start_date]
        if end_date:
            incidents = [i for i in incidents if i.date_time <= end_date]

        # Calculate metrics
        total = len(incidents)
        near_misses = len([i for i in incidents if i.incident_type == IncidentType.NEAR_MISS])
        first_aid = len([i for i in incidents if i.incident_type == IncidentType.FIRST_AID])
        recordables = len([i for i in incidents if i.osha_recordable])
        lost_time = len([i for i in incidents if i.incident_type == IncidentType.LOST_TIME])
        total_days_lost = sum(i.days_lost for i in incidents)

        # Category breakdown
        by_category = {}
        for cat in IncidentCategory:
            count = len([i for i in incidents if i.category == cat])
            if count > 0:
                by_category[cat.value] = count

        return {
            "total_incidents": total,
            "near_misses": near_misses,
            "first_aid_cases": first_aid,
            "osha_recordables": recordables,
            "lost_time_incidents": lost_time,
            "total_days_lost": total_days_lost,
            "by_category": by_category,
            "near_miss_ratio": near_misses / recordables if recordables else 0
        }

    def calculate_trir(self, hours_worked: int, project_id: str = None) -> float:
        """Calculate Total Recordable Incident Rate."""
        incidents = list(self.incidents.values())
        if project_id:
            incidents = [i for i in incidents if i.project_id == project_id]

        recordables = len([i for i in incidents if i.osha_recordable])

        if hours_worked == 0:
            return 0

        # TRIR = (Recordables × 200,000) / Hours Worked
        return (recordables * 200000) / hours_worked

    def calculate_dart(self, hours_worked: int, project_id: str = None) -> float:
        """Calculate Days Away, Restricted, or Transferred rate."""
        incidents = list(self.incidents.values())
        if project_id:
            incidents = [i for i in incidents if i.project_id == project_id]

        dart_cases = len([i for i in incidents
                         if i.incident_type in [IncidentType.LOST_TIME]])

        if hours_worked == 0:
            return 0

        return (dart_cases * 200000) / hours_worked

    def get_trend_analysis(self, months: int = 6) -> List[Dict]:
        """Analyze incident trends over time."""
        trends = []
        now = datetime.now()

        for i in range(months):
            month_start = datetime(now.year, now.month - i, 1) if now.month > i else datetime(now.year - 1, 12 - (i - now.month), 1)
            month_end = month_start.replace(day=28) + timedelta(days=4)
            month_end = month_end - timedelta(days=month_end.day)

            month_incidents = [inc for inc in self.incidents.values()
                             if month_start <= inc.date_time <= month_end]

            trends.append({
                "month": month_start.strftime("%Y-%m"),
                "total": len(month_incidents),
                "near_misses": len([i for i in month_incidents if i.incident_type == IncidentType.NEAR_MISS]),
                "recordables": len([i for i in month_incidents if i.osha_recordable])
            })

        return list(reversed(trends))

    def generate_incident_report(self, incident_id: str) -> str:
        """Generate detailed incident report."""
        if incident_id not in self.incidents:
            return "Incident not found"

        inc = self.incidents[incident_id]

        lines = [
            f"# Incident Report",
            f"",
            f"**Incident ID:** {inc.id}",
            f"**Type:** {inc.incident_type.value}",
            f"**Category:** {inc.category.value}",
            f"**Date/Time:** {inc.date_time.strftime('%Y-%m-%d %H:%M')}",
            f"**Location:** {inc.location}",
            f"**Project:** {inc.project_name}",
            f"**Status:** {inc.status.value}",
            f"**OSHA Recordable:** {'Yes' if inc.osha_recordable else 'No'}",
            f"",
            f"## Description",
            f"{inc.description}",
            f"",
            f"## Immediate Actions Taken",
            f"{inc.immediate_actions}",
            f"",
        ]

        if inc.injured_person:
            lines.extend([
                f"## Injured Person",
                f"- Name: {inc.injured_person.name}",
                f"- Company: {inc.injured_person.company}",
                f"- Role: {inc.injured_person.role}",
                f"- Experience: {inc.injured_person.years_experience} years",
                f""
            ])

        if inc.root_causes:
            lines.extend([
                f"## Root Causes",
                *[f"- {rc}" for rc in inc.root_causes],
                f""
            ])

        if inc.corrective_actions:
            lines.extend([
                f"## Corrective Actions",
                f"| Action | Assigned To | Due | Status |",
                f"|--------|-------------|-----|--------|"
            ])
            for ca in inc.corrective_actions:
                lines.append(f"| {ca.description} | {ca.assigned_to} | {ca.due_date.strftime('%Y-%m-%d')} | {ca.status} |")

        return "\n".join(lines)

Quick Start

python
# Initialize manager
manager = IncidentManager()

# Report near-miss
incident = manager.report_incident(
    incident_type=IncidentType.NEAR_MISS,
    category=IncidentCategory.STRUCK_BY,
    date_time=datetime.now(),
    location="Level 5, Grid C-4",
    project_id="PRJ-001",
    project_name="Office Tower",
    description="Unsecured tool fell from scaffold, landed 2 feet from worker",
    immediate_actions="Area cordoned off, toolbox talk conducted",
    reported_by="Site Foreman"
)

# 5 Whys analysis
analysis = manager.five_whys_analysis(incident.id, [
    "Tool fell from scaffold",
    "Tool was not secured",
    "No tool lanyard was used",
    "Worker not trained on tool tethering",
    "Tool tethering training not included in orientation"
])

# Assign corrective actions
manager.assign_corrective_action(
    incident.id,
    "Update orientation to include tool tethering training",
    "Safety Manager",
    due_days=14
)

manager.assign_corrective_action(
    incident.id,
    "Provide tool lanyards to all workers at height",
    "Procurement",
    due_days=7
)

# Get metrics
metrics = manager.get_incident_metrics()
print(f"Total Incidents: {metrics['total_incidents']}")
print(f"Near Miss Ratio: {metrics['near_miss_ratio']:.1f}")

# Generate report
print(manager.generate_incident_report(incident.id))

Requirements

bash
pip install (no external dependencies)

Didn't find tool you were looking for?

Be as detailed as possible for better results