Agent skill
klingai-audit-logging
Implement audit logging for Kling AI operations for compliance and security. Use when tracking API usage or preparing for audits. Trigger with phrases like 'klingai audit', 'kling ai audit log', 'klingai compliance log', 'video generation audit trail'.
Install this agent skill to your Project
npx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/main/plugins/saas-packs/klingai-pack/skills/klingai-audit-logging
SKILL.md
Kling AI Audit Logging
Overview
Compliance-grade audit logging for Kling AI API operations. Every task submission, status change, and credential usage is captured in tamper-evident structured logs.
Audit Event Schema
import json
import hashlib
import time
from datetime import datetime
from pathlib import Path
class AuditLogger:
"""Append-only audit log with integrity checksums."""
def __init__(self, log_dir: str = "audit"):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(exist_ok=True)
self._prev_hash = "genesis"
def _compute_hash(self, entry: dict) -> str:
raw = json.dumps(entry, sort_keys=True) + self._prev_hash
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def log(self, event_type: str, actor: str, details: dict):
"""Write a tamper-evident audit entry."""
entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"event_type": event_type,
"actor": actor,
"details": details,
"prev_hash": self._prev_hash,
}
entry["hash"] = self._compute_hash(entry)
self._prev_hash = entry["hash"]
date = datetime.utcnow().strftime("%Y-%m-%d")
filepath = self.log_dir / f"audit-{date}.jsonl"
with open(filepath, "a") as f:
f.write(json.dumps(entry) + "\n")
return entry["hash"]
Audit Events for Kling AI
class KlingAuditClient:
"""Kling client with full audit trail."""
def __init__(self, base_client, audit: AuditLogger, actor: str = "system"):
self.client = base_client
self.audit = audit
self.actor = actor
def text_to_video(self, prompt: str, **kwargs):
# Log submission
self.audit.log("task_submitted", self.actor, {
"action": "text_to_video",
"model": kwargs.get("model", "kling-v2-master"),
"duration": kwargs.get("duration", 5),
"mode": kwargs.get("mode", "standard"),
"prompt_hash": hashlib.sha256(prompt.encode()).hexdigest()[:16],
"prompt_length": len(prompt),
})
result = self.client.text_to_video(prompt, **kwargs)
# Log completion
self.audit.log("task_completed", self.actor, {
"action": "text_to_video",
"status": "succeed",
"video_count": len(result.get("videos", [])),
})
return result
def log_auth_event(self, event: str, success: bool):
self.audit.log("auth_event", self.actor, {
"event": event,
"success": success,
"access_key_prefix": self.client.config.access_key[:8] + "...",
})
Audit Log Verification
def verify_audit_chain(log_file: str) -> bool:
"""Verify tamper-evidence of audit log chain."""
prev_hash = "genesis"
entries = []
with open(log_file) as f:
for line_num, line in enumerate(f, 1):
entry = json.loads(line)
entries.append(entry)
if entry["prev_hash"] != prev_hash:
print(f"Chain broken at line {line_num}: "
f"expected prev_hash={prev_hash}, got {entry['prev_hash']}")
return False
# Recompute hash
check_entry = {k: v for k, v in entry.items() if k != "hash"}
raw = json.dumps(check_entry, sort_keys=True) + prev_hash
expected_hash = hashlib.sha256(raw.encode()).hexdigest()[:16]
if entry["hash"] != expected_hash:
print(f"Hash mismatch at line {line_num}")
return False
prev_hash = entry["hash"]
print(f"Verified {len(entries)} entries -- chain intact")
return True
Audit Report Generator
def generate_audit_report(log_dir: str = "audit", days: int = 30) -> dict:
"""Generate compliance audit report."""
from collections import Counter
from datetime import timedelta
log_path = Path(log_dir)
events = []
cutoff = datetime.utcnow() - timedelta(days=days)
for filepath in sorted(log_path.glob("audit-*.jsonl")):
with open(filepath) as f:
for line in f:
entry = json.loads(line)
if entry["timestamp"] >= cutoff.isoformat():
events.append(entry)
event_types = Counter(e["event_type"] for e in events)
actors = Counter(e["actor"] for e in events)
report = {
"period_days": days,
"total_events": len(events),
"event_types": dict(event_types),
"unique_actors": len(actors),
"actors": dict(actors),
"first_event": events[0]["timestamp"] if events else None,
"last_event": events[-1]["timestamp"] if events else None,
}
print(f"\n=== Audit Report ({days} days) ===")
print(f"Total events: {report['total_events']}")
for event_type, count in event_types.most_common():
print(f" {event_type}: {count}")
print(f"Actors: {', '.join(actors.keys())}")
return report
Compliance Checklist
- All API calls logged with timestamp, actor, action
- Prompts stored as hashes (not plaintext) for privacy
- Audit chain integrity verifiable
- Logs retained for required period (typically 1-7 years)
- Log access restricted to authorized personnel
- Regular verification of chain integrity
Resources
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
dockerfile-generator
Dockerfile Generator - Auto-activating skill for DevOps Basics. Triggers on: dockerfile generator, dockerfile generator Part of the DevOps Basics skill category.
branch-naming-helper
Branch Naming Helper - Auto-activating skill for DevOps Basics. Triggers on: branch naming helper, branch naming helper Part of the DevOps Basics skill category.
readme-generator
Readme Generator - Auto-activating skill for DevOps Basics. Triggers on: readme generator, readme generator Part of the DevOps Basics skill category.
makefile-generator
Makefile Generator - Auto-activating skill for DevOps Basics. Triggers on: makefile generator, makefile generator Part of the DevOps Basics skill category.
gitignore-generator
Gitignore Generator - Auto-activating skill for DevOps Basics. Triggers on: gitignore generator, gitignore generator Part of the DevOps Basics skill category.
pre-commit-hook-setup
Pre Commit Hook Setup - Auto-activating skill for DevOps Basics. Triggers on: pre commit hook setup, pre commit hook setup Part of the DevOps Basics skill category.
Didn't find tool you were looking for?