Skip to content

Ejemplos de Plugins

Esta sección contiene ejemplos prácticos de plugins para diferentes casos de uso.

Plugin de Decodificación Base64

Propósito: Decodifica contenido base64 embebido en archivos

Estructura

decodebase64/
├── plugin.toml
├── plugin.py
└── schema.json

plugin.toml

toml
name = "decodebase64"
version = "1.0.0"
api_version = "1.x"
entry = "python3 plugin.py"
capabilities = ["transform"]
needs_content = true
reads_fs = false
timeout_ms = 5000
config_schema = "schema.json"

schema.json

json
{
  "type": "object",
  "properties": {
    "mode": {
      "type": "string",
      "enum": ["safe", "aggressive"],
      "default": "safe"
    },
    "min_length": {
      "type": "integer",
      "minimum": 16,
      "default": 64
    }
  }
}

plugin.py

python
#!/usr/bin/env python3
import sys
import json
import base64
import re

def send_response(id, result=None, error=None):
    obj = {"jsonrpc": "2.0", "id": id}
    obj["result" if error is None else "error"] = result if error is None else error
    sys.stdout.write(json.dumps(obj) + "\n")
    sys.stdout.flush()

config = {"mode": "safe", "min_length": 64}

for line in sys.stdin:
    msg = json.loads(line)
    mid, mth, p = msg.get("id"), msg.get("method"), msg.get("params", {})
    
    if mth == "plugin.init":
        config.update(p.get("options", {}))
        send_response(mid, {
            "ok": True,
            "capabilities": ["transform"],
            "plugin_version": "1.0.0"
        })
    elif mth == "file.transform":
        results, decoded = [], 0
        for f in p.get("files", []):
            content = base64.b64decode(f["content_b64"])
            pattern = rb"[A-Za-z0-9+/]{%d,}={0,2}" % config["min_length"]
            matches = re.findall(pattern, content)
            
            if matches:
                try:
                    decoded_content = b"".join([base64.b64decode(m) for m in matches])
                    results.append({
                        "path": f["path"],
                        "actions": ["decoded:base64"],
                        "content_b64": base64.b64encode(decoded_content).decode(),
                        "notes": [f"blocks:{len(matches)}"]
                    })
                    decoded += 1
                except Exception:
                    results.append({"path": f["path"], "actions": []})
            else:
                results.append({"path": f["path"], "actions": []})
        
        send_response(mid, {
            "files": results,
            "metrics": {"decoded": decoded, "ms": 0}
        })
    elif mth == "plugin.ping":
        send_response(mid, {"pong": True})
    elif mth == "plugin.shutdown":
        break
    else:
        send_response(mid, None, {
            "code": 1002,
            "message": "unknown method",
            "data": {"method": mth}
        })

Uso

bash
rootcause ./code --plugin ./plugins/decodebase64 --plugin-opt decodebase64.mode=aggressive

Analizador TypeScript

Propósito: Detecta uso peligroso de eval() en archivos TypeScript

plugin.toml

toml
name = "ts-eval-detector"
version = "1.0.0"
api_version = "1.x"
entry = "python3 plugin.py"
capabilities = ["analyze"]
needs_content = true
reads_fs = false
timeout_ms = 3000

plugin.py

python
#!/usr/bin/env python3
import sys
import json
import base64
import hashlib

def send_response(id, result=None, error=None):
    obj = {"jsonrpc": "2.0", "id": id}
    obj["result" if error is None else "error"] = result if error is None else error
    sys.stdout.write(json.dumps(obj) + "\n")
    sys.stdout.flush()

for line in sys.stdin:
    msg = json.loads(line)
    mid, mth, p = msg.get("id"), msg.get("method"), msg.get("params", {})
    
    if mth == "plugin.init":
        send_response(mid, {
            "ok": True,
            "capabilities": ["analyze"],
            "plugin_version": "1.0.0"
        })
    elif mth == "file.analyze":
        findings = []
        for f in p.get("files", []):
            if f.get("language") != "typescript":
                continue
                
            content = base64.b64decode(f["content_b64"]).decode('utf-8')
            lines = content.split('\n')
            
            for i, line in enumerate(lines, 1):
                if "eval(" in line:
                    col = line.find("eval(") + 1
                    fingerprint = hashlib.sha256(f"{f['path']}:{i}:{line}".encode()).hexdigest()
                    
                    findings.append({
                        "rule_id": "ts.no-eval",
                        "severity": "MEDIUM",
                        "file": f["path"],
                        "line": i,
                        "column": col,
                        "excerpt": line.strip(),
                        "message": "Evita el uso de eval() en TypeScript",
                        "remediation": "Usa Function() o un parser apropiado",
                        "fingerprint": f"sha256:{fingerprint}"
                    })
        
        send_response(mid, {"findings": findings})
    elif mth == "plugin.ping":
        send_response(mid, {"pong": True})
    elif mth == "plugin.shutdown":
        break
    else:
        send_response(mid, None, {
            "code": 1002,
            "message": "unknown method"
        })

Generador de Reportes Personalizado

Propósito: Genera reportes en formato HTML personalizado

plugin.toml

toml
name = "html-reporter"
version = "1.0.0"
api_version = "1.x"
entry = "python3 plugin.py"
capabilities = ["report"]
needs_content = false
reads_fs = false
timeout_ms = 5000

plugin.py

python
#!/usr/bin/env python3
import sys
import json
import base64
from datetime import datetime

def send_response(id, result=None, error=None):
    obj = {"jsonrpc": "2.0", "id": id}
    obj["result" if error is None else "error"] = result if error is None else error
    sys.stdout.write(json.dumps(obj) + "\n")
    sys.stdout.flush()

def generate_html_report(findings):
    html = f"""
    <!DOCTYPE html>
    <html>
    <head>
        <title>Reporte RootCause - {datetime.now().strftime('%Y-%m-%d %H:%M')}</title>
        <style>
            body {{ font-family: Arial, sans-serif; margin: 20px; }}
            .finding {{ border: 1px solid #ccc; margin: 10px 0; padding: 15px; }}
            .high {{ border-left: 5px solid #ff4444; }}
            .medium {{ border-left: 5px solid #ffaa00; }}
            .low {{ border-left: 5px solid #44aa44; }}
            .file {{ font-weight: bold; color: #0066cc; }}
            .message {{ margin: 10px 0; }}
            .remediation {{ background: #f0f0f0; padding: 10px; margin: 10px 0; }}
        </style>
    </head>
    <body>
        <h1>Reporte de Análisis RootCause</h1>
        <p>Generado: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
        <p>Total de hallazgos: {len(findings)}</p>
    """
    
    for finding in findings:
        severity_class = finding["severity"].lower()
        html += f"""
        <div class="finding {severity_class}">
            <div class="file">{finding["file"]}:{finding["line"]}:{finding["column"]}</div>
            <div class="message"><strong>{finding["message"]}</strong></div>
            <div class="remediation"><strong>Recomendación:</strong> {finding["remediation"]}</div>
            <pre>{finding["excerpt"]}</pre>
        </div>
        """
    
    html += "</body></html>"
    return html

for line in sys.stdin:
    msg = json.loads(line)
    mid, mth, p = msg.get("id"), msg.get("method"), msg.get("params", {})
    
    if mth == "plugin.init":
        send_response(mid, {
            "ok": True,
            "capabilities": ["report"],
            "plugin_version": "1.0.0"
        })
    elif mth == "report.generate":
        findings = p.get("findings", [])
        html_content = generate_html_report(findings)
        
        # Guardar archivo HTML
        with open("rootcause-report.html", "w", encoding="utf-8") as f:
            f.write(html_content)
        
        send_response(mid, {
            "files": [{
                "path": "rootcause-report.html",
                "content_b64": base64.b64encode(html_content.encode()).decode(),
                "format": "html"
            }],
            "metrics": {
                "findings_processed": len(findings),
                "files_generated": 1
            }
        })
    elif mth == "plugin.ping":
        send_response(mid, {"pong": True})
    elif mth == "plugin.shutdown":
        break
    else:
        send_response(mid, None, {
            "code": 1002,
            "message": "unknown method"
        })

Plugin de Descubrimiento

Propósito: Encuentra archivos adicionales para analizar

plugin.toml

toml
name = "config-finder"
version = "1.0.0"
api_version = "1.x"
entry = "python3 plugin.py"
capabilities = ["discover"]
needs_content = false
reads_fs = true
timeout_ms = 2000

plugin.py

python
#!/usr/bin/env python3
import sys
import json
import os
import glob

def send_response(id, result=None, error=None):
    obj = {"jsonrpc": "2.0", "id": id}
    obj["result" if error is None else "error"] = result if error is None else error
    sys.stdout.write(json.dumps(obj) + "\n")
    sys.stdout.flush()

for line in sys.stdin:
    msg = json.loads(line)
    mid, mth, p = msg.get("id"), msg.get("method"), msg.get("params", {})
    
    if mth == "plugin.init":
        send_response(mid, {
            "ok": True,
            "capabilities": ["discover"],
            "plugin_version": "1.0.0"
        })
    elif mth == "discover.files":
        workspace = p.get("workspace_root", ".")
        additional_files = []
        
        # Buscar archivos de configuración
        config_patterns = [
            "**/*.config.js",
            "**/*.config.ts",
            "**/webpack.config.*",
            "**/vite.config.*",
            "**/.env*",
            "**/docker-compose.yml",
            "**/Dockerfile*"
        ]
        
        for pattern in config_patterns:
            files = glob.glob(os.path.join(workspace, pattern), recursive=True)
            additional_files.extend(files)
        
        send_response(mid, {
            "files": [{"path": f} for f in additional_files],
            "exclude_patterns": ["**/node_modules/**", "**/.git/**"]
        })
    elif mth == "plugin.ping":
        send_response(mid, {"pong": True})
    elif mth == "plugin.shutdown":
        break
    else:
        send_response(mid, None, {
            "code": 1002,
            "message": "unknown method"
        })

Uso de los Ejemplos

bash
# Instalar plugins
mkdir -p ~/.config/rootcause/plugins
cp -r decodebase64 ~/.config/rootcause/plugins/
cp -r ts-eval-detector ~/.config/rootcause/plugins/
cp -r html-reporter ~/.config/rootcause/plugins/

# Ejecutar con múltiples plugins
rootcause ./proyecto \
  --plugin decodebase64 \
  --plugin ts-eval-detector \
  --plugin html-reporter \
  --plugin-opt decodebase64.mode=aggressive

RootCause - Modular Static Analysis Engine