- מפת Pipeline מתועדת — דיאגרמת שלבים עם Input/Output לכל שלב ומפרט Stage Specification
- Orchestrator עובד ב-Python או TypeScript — קוד שמנהל את רצף השלבים, מעביר נתונים, ומטפל בשגיאות
- מנגנון Checkpoint — שמירת מצב אחרי כל שלב מוצלח, עם יכולת Resume מנקודת כשל
- Error Matrix מתועד — טבלה שמגדירה לכל שלב מה קורה כשיש שגיאה (retry, skip, abort, escalate)
- מערכת לוגים מובנית — structured logging עם זמנים, עלויות, טוקנים, ותוצאות לכל שלב
- Pipeline מרובה שלבים עובד — לפחות 4 שלבים, עם error handling, logging, ו-checkpoints
- מסמך Pipeline Patterns — 3 תבניות pipeline לתחומים שונים, מותאמות לדומיין שלכם
- תוכנית Scaling — מסמך שמגדיר איך להריץ את ה-pipeline שלכם בנפח גבוה עם rate limits ו-budgets
- תוכלו לעצב pipeline מרובה שלבים מאפס — מהגדרת הפלט הסופי ועד כל שלב ביניים
- תוכלו לבנות orchestrator שמנהל שלבים, מעביר נתונים, ומתאושש משגיאות אוטומטית
- תוכלו להחליט מתי להשתמש ב-pipeline (סדרתי) מול team (מקבילי) מול script (חד-שלבי)
- תוכלו לנתח עלויות pipeline ולזהות שלבים יקרים שדורשים אופטימיזציה
- תוכלו להרחיב pipeline שעובד לנפח גבוה עם queues, rate limits, ו-cost budgeting
- פרקים קודמים: פרק 3 (Agent SDK — Python) או פרק 4 (Agent SDK — TypeScript) — הבנת Agent, Tools, Results. פרק 5 (Agent Teams) — הבנת ההבדל בין עבודה מקבילית לסדרתית
- כלים נדרשים: Python 3.11+ או Node.js 18+, מפתח ANTHROPIC_API_KEY, עורך קוד (VS Code מומלץ), Claude Code v2.1.81+
- ידע נדרש: כתיבת סוכנים עם ה-Agent SDK, async/await, קריאה וכתיבה לקבצים, JSON
- זמן משוער: 4-5 שעות | עלות משוערת: 40-100 ₪ (~$10-25, ריצות pipeline צורכות הכי הרבה טוקנים)
בפרק 5 בניתם צוותי סוכנים — Agent Teams שעובדים במקביל על משימות מורכבות, עם Lead Agent שמתאם ומסנתז תוצאות. עכשיו אנחנו עוברים מ-"עבודה במקביל" ל-"עבודה ברצף" — Pipelines שבהם הפלט של שלב אחד הופך לקלט של השלב הבא, כמו פס ייצור במפעל. בפרק 7 ניקח את ה-Pipelines האלה ונריץ אותם בתוך Docker containers ושרתים מרוחקים — הצעד האחרון לפני שזה באמת production.
| מונח (English) | תרגום | הסבר |
|---|---|---|
| Pipeline | צינור עבודה | רצף אוטומטי של שלבים שבו הפלט של שלב אחד הופך לקלט של השלב הבא — כמו פס ייצור |
| Orchestrator | מנצח / מתזמר | הקוד שמנהל את ה-pipeline: מריץ שלבים בסדר, מעביר נתונים, מטפל בשגיאות. זה לא Claude — זה הקוד שלכם |
| Stage | שלב | יחידת עבודה בודדת ב-pipeline — מקבלת input מוגדר, מריצה סוכן, ומייצרת output מוגדר |
| Checkpoint | נקודת שמירה | שמירת מצב ה-pipeline לדיסק אחרי כל שלב מוצלח — מאפשרת המשך מנקודת הכשל במקום התחלה מחדש |
| Exponential Backoff | השהיה מעריכית | אסטרטגיית retry שבה זמן ההמתנה גדל בין ניסיונות: 1 שנייה, 2, 4, 8... מונע הצפה של ה-API |
| Data Flow | זרימת נתונים | הדרך שבה מידע עובר בין שלבי ה-pipeline — בזיכרון, דרך קבצים, או שילוב של שניהם |
| Rollback | חזרה לגרסה קודמת | שחזור ה-pipeline לגרסה קודמת שעבדה כשגרסה חדשה מייצרת תוצאות גרועות יותר |
| Backpressure | לחץ לאחור | מנגנון שמאט את קצב הקלט כשהמערכת לא מספיקה לעבד — מונע קריסה מעומס |
| Queue | תור | מבנה נתונים שמחכה — items נכנסים מצד אחד ויוצאים מצד שני. Redis, RabbitMQ, או SQS לדוגמה |
| Structured Logging | לוגים מובנים | כתיבת לוגים בפורמט JSON עם שדות קבועים (זמן, שלב, עלות, תוצאה) — מאפשר חיפוש וניתוח אוטומטי |
מה זה Pipeline בכלל?
בואו נתחיל מהמושג הבסיסי: Pipeline (צינור עבודה) הוא רצף אוטומטי של שלבים שבו הפלט של שלב אחד הופך לקלט של השלב הבא. זה כמו פס ייצור במפעל — כל תחנה עושה את שלה, מעבירה למוסעה הבאה, ובסוף יוצא מוצר מוגמר.
למה זה חשוב? כי עד עכשיו בנינו סוכנים בודדים (פרקים 3-4) וצוותי סוכנים (פרק 5). סוכן בודד מצוין למשימה ממוקדת. צוות מצוין למשימות שאפשר לפצל ולהריץ במקביל. אבל הרבה תהליכים אמיתיים הם סדרתיים מטבעם — אי אפשר לערוך טקסט שעדיין לא נכתב, אי אפשר לבדוק קוד שעדיין לא נוצר. לזה צריך pipeline.
ההבדל הקריטי בין pipeline לבין "שלוש פקודות שרצות בזו אחר זו": pipeline מוגדר היטב — כל שלב יודע מה הוא מקבל, מה הוא מייצר, ומה קורה כשהוא נכשל. יש לו error handling, logging, checkpoints. הוא מתוכנן להיות אמין ולרוץ שוב ושוב — לא סקריפט חד-פעמי שעובד "אם הכל בסדר".
| מבנה | מה זה | מתי להשתמש | דוגמה |
|---|---|---|---|
| Script (פרק 1) | קריאה אחת ל-Claude, תוצאה אחת | משימה פשוטה שלא צריכה שלבים | "תרגם את הקובץ הזה" |
| Pipeline (הפרק הזה) | שרשרת קריאות — כל אחת בונה על הקודמת | תהליך סדרתי עם תלויות בין שלבים | "תחקור → תכתוב outline → תכתוב מאמר → תערוך → תפרסם" |
| Team (פרק 5) | כמה סוכנים במקביל + Lead שמסנתז | תת-משימות עצמאיות שאפשר לפצל | "שלושה חוקרים בודקים באג מזוויות שונות" |
הכלל: אם שלב B תלוי בפלט של שלב A — זה pipeline. אם שלבים יכולים לרוץ בלי תלות — זה team. אם יש שלב אחד — זה script.
דוגמאות ל-pipelines מהעולם האמיתי:
- Code Generation Pipeline: Requirements → Design → Implementation → Testing → Review → Documentation → Deploy
- Content Publishing Pipeline: Research → Brief → Draft → Edit → SEO → Publish
- Data Processing Pipeline: Ingest → Validate → Clean → Transform → Analyze → Report
- DevOps Pipeline: Monitor → Detect → Diagnose → Fix → Test → Deploy → Verify
- Customer Support Pipeline: Receive → Classify → Research → Draft Response → Review → Send
Pipeline מרובה שלבים מייצר תוצאות טובות משמעותית מקריאה אחת ל-Claude, גם כשמשקיעים הרבה בפרומפט — מכיוון שכל שלב ממוקד במשימה אחת, בהקשר נקי, עם הנחיות מדויקות. pipeline בן 7 שלבים לכתיבת תוכן מייצר פרקים של 5,000-10,000 מילים ב-15-25 דקות, בעלות של $3-8 לפרק (לפי אומדן עבור Opus 4.6 נכון למרץ 2026). התוצאה עולה בהרבה על single-prompt generation כי ה-pipeline תופס שגיאות שקריאה יחידה מפספסת.
Claude Code עצמו מספק כלים מובנים שרלוונטיים ישירות ל-pipelines. /batch (פברואר 2026) מפרק עבודה ל-30 יחידות, כל אחת ב-git worktree מבודד עם סוכן עצמאי שפותח PR. Hooks (מגרסה 2.0, ספטמבר 2025) מאפשרים להפעיל סקריפטים אוטומטית לפני/אחרי כל tool call — quality gate מושלם בין שלבי pipeline. Skills (SKILL.md) מאפשרים לארוז שלב pipeline כ-/slash-command עם frontmatter שמגדיר כלים, מודל, ו-effort level. וכמובן, Agent SDK (Python v0.1.48, TypeScript v0.2.71 נכון למרץ 2026) הוא הבסיס לבניית pipeline stages פרוגרמטיים — אותם כלים וסוכנים כמו ב-Claude Code, ארוזים כספרייה.
חשבו על תהליך עבודה שאתם עושים שוב ושוב — כתיבת דוח, יצירת מצגת, בדיקת קוד, או כל דבר אחר. רשמו אותו כרצף שלבים: "קודם אני עושה X, אחר כך Y, אחר כך Z." ספרו כמה שלבים יצאו. אם 3-10 שלבים — מצוין, יש לכם מועמד מושלם ל-pipeline.
פתחו Claude Code והקלידו /batch. קראו את התיאור שמופיע. חשבו: האם התהליך שזיהיתם למעלה מתאים ל-/batch (משימות עצמאיות שאפשר לפצל) או ל-pipeline מותאם אישית (שלבים סדרתיים עם תלויות)? רשמו את התשובה — היא תנחה את הבחירה שלכם בהמשך הפרק.
עיצוב Pipelines מרובי שלבים
הכלל הראשון והחשוב ביותר בעיצוב pipeline: תתחילו מהסוף. מה התוצר הסופי? מאמר מפורסם? קוד שעבר code review? דוח מנותח? הגדירו את הפלט הסופי בצורה מדויקת — ואז עבדו אחורה.
נניח שהתוצר הסופי הוא "מאמר בלוג מפורסם." עובדים אחורה: מה צריך בשביל לפרסם? מאמר ערוך, אופטימיזציית SEO, תמונות. מה צריך בשביל מאמר ערוך? טיוטה שעברה עריכה. מה צריך בשביל טיוטה? outline מפורט + מחקר. מה צריך בשביל outline? brief עם נושא, קהל יעד, וטון. ככה מגלים את השלבים — מהסוף להתחלה.
Stage Specification Template — תבנית מפרט שלב
לכל שלב ב-pipeline, כתבו מפרט שכולל שישה דברים:
| שדה | מה לכתוב | דוגמה |
|---|---|---|
| Name | שם השלב | Research Stage |
| Input | מה השלב מקבל מהשלב הקודם או מהטריגר | Content brief (JSON) with topic, audience, tone |
| Agent Config | מודל, כלים, max_turns, הנחיות ספציפיות | Sonnet 4.6, WebSearch, maxTurns: 15, "Find 5 sources" |
| Output | מה השלב מייצר בדיוק | research.json with sources[], statistics[], key_findings[] |
| Success Criteria | איך יודעים שהשלב הצליח | At least 3 sources found, output JSON is valid |
| Failure Action | מה קורה כשנכשל — retry, skip, abort, escalate | Retry with different search terms (max 2 retries), then abort |
5-8 שלבים זה ה-sweet spot לרוב ה-pipelines. פחות מ-3 שלבים — כנראה שמספיק script פשוט. יותר מ-10 שלבים — כדאי לפצל ל-sub-pipelines. כל שלב שלא מוסיף ערך ברור (ולידציה, שיפור, פורמט) הוא תקורה מיותרת שעולה זמן ועלות.
קחו את תהליך העבודה שזיהיתם קודם. לכל שלב, מלאו את ה-Stage Specification Template — שם, input, output, success criteria, failure action. אל תדלגו על failure action — זה מה שהופך pipeline לאמין.
מפתחים רבים מגדירים שלבים עם input ו-output, אבל שוכחים לשאול "איך יודעים שזה עבד?" בלי success criteria, ה-pipeline ממשיך עם פלט גרוע ושגיאות מצטברות. שלב Research שמצא 0 מקורות? ה-pipeline ממשיך לכתוב מאמר ללא מקורות. הפתרון: לכל שלב, הגדירו בדיקה אוטומטית — JSON תקין? מספיק תוצאות? אורך טקסט סביר? בדקו לפני שמעבירים לשלב הבא.
מפתחים רבים מתחילים לכתוב orchestrator מלא מאפס, עם queue, retry logic, ו-logging — כשלפעמים מספיק להשתמש ב-/batch של Claude Code שכבר עושה את רוב העבודה (פירוק למשימות, worktrees מבודדים, PR לכל יחידה). לפני שמתחילים לבנות, שאלו: "האם /batch או /loop עם skills מספיקים למקרה שלי?" אם כן — חסכתם שבועות של פיתוח. בנו custom pipeline רק כשצריך שליטה מלאה על flow, error handling, או שילוב עם מערכות חיצוניות.
עקרון הממשק הנקי — Clean Interface Principle
כל שלב ב-pipeline צריך להתנהג כמו קופסה שחורה: input מוגדר → עיבוד → output מוגדר. השלב הבא לא צריך לדעת איך השלב הקודם עבד — רק מה הוא ייצר. זה מאפשר:
- החלפה: אפשר להחליף שלב (למשל מודל אחר) בלי לשנות את שאר ה-pipeline
- בדיקה: אפשר לבדוק כל שלב בנפרד עם input מוק
- מקביליות: אם שני שלבים מקבלים את אותו input, אפשר להריץ אותם במקביל
- שימוש חוזר: שלב "Research" שעובד טוב אפשר להשתמש בו ב-pipelines אחרים
בפועל, "ממשק נקי" אומר שאתם מגדירים חוזה (contract) לכל שלב: "השלב הזה מקבל JSON עם שדות X, Y, Z ומחזיר JSON עם שדות A, B, C." אם שלב אחד מפר את החוזה — ולידציה תופסת את זה לפני שהשלב הבא רואה פלט פגום. זה הבסיס של pipeline אמין.
לכל שלב ב-pipeline שלכם, כתבו משפט אחד שמתאר את ה-"חוזה": "מקבל: ___. מחזיר: ___." וודאו שה-output של שלב N תואם בדיוק ל-input של שלב N+1. אם יש פער — מצאתם באג בעיצוב.
דפוס ה-Orchestrator — מנצח התזמורת
ה-Orchestrator הוא הקוד שלכם שמנהל את ה-pipeline. זה לא Claude. זה לא סוכן. זה Python script או TypeScript module שיודע: איזה שלבים לרוץ, באיזה סדר, איך להעביר נתונים ביניהם, ומה לעשות כשמשהו נכשל.
למה ה-orchestrator חייב להיות קוד דטרמיניסטי ולא סוכן AI? כי:
- אמינות: קוד רגיל לא "מחליט" לדלג על שלבים כי הוא חושב שהם מיותרים
- ביצועים: ניהול flow לא צריך לצרוך טוקנים — זו פעולה זולה שמחשב עושה מהר
- שליטה: אתם קובעים את הלוגיקה, לא מודל שהפלט שלו לא דטרמיניסטי
- דיבוג: קל לזהות בדיוק איפה pipeline נתקע כשה-flow logic הוא קוד פשוט
Separation of Concerns (הפרדת אחריות) — שלוש שכבות:
| שכבה | אחריות | דוגמה |
|---|---|---|
| Orchestrator | Flow control, error handling, logging | "הרץ שלב 1, בדוק הצלחה, העבר output לשלב 2" |
| Stages | Agent config, prompt engineering, output parsing | "הגדר סוכן Sonnet עם WebSearch, תן לו brief, פרסר JSON" |
| Agents | Tool use, reasoning, execution | "Claude מחפש באינטרנט, קורא קבצים, מייצר תוכן" |
בואו נראה את זה בפרקטיקה. הנה stage wrapper שמראה את ההפרדה — ה-stage מגדיר את הסוכן, ה-orchestrator מנהל את ה-flow:
# stage definition — each stage is a self-contained unit
def create_research_stage():
"""Define the Research stage with its agent configuration"""
async def execute(input_data: dict) -> dict:
from claude_agent_sdk import Agent
agent = Agent(
model="claude-sonnet-4-6",
tools=["WebSearch", "WebFetch", "Write"],
max_turns=15,
instructions="You are a research agent. Find 5+ sources on the given topic."
)
prompt = f"Research this topic: {input_data['topic']}\n"
prompt += f"Focus areas: {', '.join(input_data.get('focus_areas', []))}"
result = await agent.run(prompt)
return {
"output": {
"sources": result.structured_output.get("sources", []),
"statistics": result.structured_output.get("statistics", []),
"key_findings": result.structured_output.get("key_findings", [])
},
"cost": result.cost_usd,
"tokens": result.total_tokens
}
return {
"name": "research",
"execute": execute,
"on_failure": "retry",
"max_retries": 2
}
שימו לב: ה-stage לא יודע שום דבר על שלבים אחרים. הוא לא יודע אם הוא שלב 2 או שלב 5. הוא מקבל input, מריץ סוכן, ומחזיר output. ה-orchestrator הוא זה שמחבר את הכל.
הנה מבנה orchestrator בסיסי ב-Python:
# pipeline_orchestrator.py
import json
import time
import logging
from pathlib import Path
from datetime import datetime
class PipelineOrchestrator:
def __init__(self, name: str, stages: list, output_dir: str = "./output"):
self.name = name
self.stages = stages
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
self.logger = self._setup_logging()
def _setup_logging(self):
logger = logging.getLogger(self.name)
logger.setLevel(logging.INFO)
handler = logging.FileHandler(
self.output_dir / f"run_{self.run_id}.log"
)
handler.setFormatter(logging.Formatter(
'%(asctime)s | %(levelname)s | %(message)s'
))
logger.addHandler(handler)
return logger
def run(self, initial_input: dict) -> dict:
data = initial_input
total_cost = 0
start_time = time.time()
self.logger.info(f"Pipeline '{self.name}' started | Run: {self.run_id}")
for i, stage in enumerate(self.stages):
stage_name = stage["name"]
self.logger.info(f"Stage {i+1}/{len(self.stages)}: {stage_name} | START")
try:
result = stage["execute"](data)
data = result["output"]
total_cost += result.get("cost", 0)
# Checkpoint
self._save_checkpoint(i, stage_name, data)
self.logger.info(
f"Stage {stage_name} | DONE | "
f"cost=${result.get('cost', 0):.3f} | "
f"tokens={result.get('tokens', 0)}"
)
except Exception as e:
self.logger.error(f"Stage {stage_name} | FAILED | {e}")
action = stage.get("on_failure", "abort")
if action == "retry":
# ... retry logic
pass
elif action == "skip":
self.logger.warning(f"Skipping {stage_name}")
continue
else:
raise
elapsed = time.time() - start_time
self.logger.info(
f"Pipeline COMPLETE | {elapsed:.1f}s | ${total_cost:.3f}"
)
return data
def _save_checkpoint(self, stage_idx, stage_name, data):
checkpoint = {
"run_id": self.run_id,
"stage_idx": stage_idx,
"stage_name": stage_name,
"data": data,
"timestamp": datetime.now().isoformat()
}
path = self.output_dir / f"checkpoint_{stage_idx}_{stage_name}.json"
path.write_text(json.dumps(checkpoint, ensure_ascii=False, indent=2))
קראו את הקוד למעלה בעיון. זהו את שלושת האלמנטים המרכזיים: (1) ה-loop שרץ על שלבים, (2) ה-checkpoint שנשמר אחרי כל שלב, (3) ה-error handling עם הבחירה בין retry/skip/abort. אלו שלוש היכולות שמבדילות orchestrator מ-script פשוט.
Orchestrator ב-TypeScript
אם אתם עובדים ב-TypeScript (בהמשך לפרק 4), הנה אותו רעיון:
// pipeline-orchestrator.ts
import { writeFile, mkdir } from 'fs/promises';
import { join } from 'path';
interface StageResult {
output: Record<string, any>;
cost?: number;
tokens?: number;
}
interface StageConfig {
name: string;
execute: (input: Record<string, any>) => Promise<StageResult>;
onFailure?: 'retry' | 'skip' | 'abort';
maxRetries?: number;
}
class PipelineOrchestrator {
private runId: string;
constructor(
private name: string,
private stages: StageConfig[],
private outputDir: string = './output'
) {
this.runId = new Date().toISOString().replace(/[:.]/g, '-');
}
async run(initialInput: Record<string, any>): Promise<Record<string, any>> {
await mkdir(this.outputDir, { recursive: true });
let data = initialInput;
let totalCost = 0;
const startTime = Date.now();
console.log(`Pipeline '${this.name}' started | Run: ${this.runId}`);
for (let i = 0; i < this.stages.length; i++) {
const stage = this.stages[i];
console.log(`Stage ${i + 1}/${this.stages.length}: ${stage.name} | START`);
try {
const result = await stage.execute(data);
data = result.output;
totalCost += result.cost ?? 0;
await this.saveCheckpoint(i, stage.name, data);
console.log(`Stage ${stage.name} | DONE | cost=$${(result.cost ?? 0).toFixed(3)}`);
} catch (error) {
console.error(`Stage ${stage.name} | FAILED |`, error);
if (stage.onFailure === 'skip') continue;
if (stage.onFailure === 'retry') { /* retry logic */ }
throw error; // default: abort
}
}
const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
console.log(`Pipeline COMPLETE | ${elapsed}s | $${totalCost.toFixed(3)}`);
return data;
}
private async saveCheckpoint(idx: number, name: string, data: any) {
const checkpoint = {
runId: this.runId,
stageIdx: idx,
stageName: name,
data,
timestamp: new Date().toISOString()
};
const path = join(this.outputDir, `checkpoint_${idx}_${name}.json`);
await writeFile(path, JSON.stringify(checkpoint, null, 2));
}
}
Hooks כ-Quality Gates בתוך Pipeline
מערכת Hooks של Claude Code (מגרסה 2.0) מאפשרת לשלב quality gates אוטומטיים בתוך pipeline. Hooks הם סקריפטים שרצים לפני (PreToolUse) או אחרי (PostToolUse) כל tool call — ואפשר להשתמש בהם כדי לבדוק שהפלט של שלב עומד בתנאים לפני שעוברים לשלב הבא. למשל:
- PostToolUse hook על Write: בודק שקובץ output תקין (JSON valid, אורך מינימלי, שדות חובה)
- PreToolUse hook על Bash: מונע הרצת פקודות מסוכנות בתוך pipeline (למשל
rm -rfאוgit push --force) - HTTP Hook: שולח את תוצאות השלב ל-webhook חיצוני (Slack, dashboard) לפני שממשיך
מגרסה 2.1.63 (פברואר 2026) נוספו HTTP Hooks — במקום סקריפט מקומי, ה-hook שולח POST JSON ל-URL ומקבל JSON חזרה. זה מאפשר integration עם מערכות חיצוניות: סיום שלב pipeline → hook שולח notification ל-Slack → pipeline ממשיך. זה לא מחליף את ה-orchestrator, אבל מוסיף שכבת בקרה שרצה ברמת ה-tool call — יותר granular מרמת ה-stage.
חשבו על ה-pipeline שלכם: באיזה שלב quality gate הכי קריטי? כתבו hook פשוט (בדמיון או בקוד) שבודק את הפלט של אותו שלב. מה הבדיקה? מה קורה אם היא נכשלת?
מפתחים שרגילים לפרומפטים ארוכים נוטים לכתוב: "תריץ את ה-pipeline הזה: קודם תחקור, אחר כך תכתוב outline, אחר כך..." — ולתת ל-Claude להחליט מתי כל שלב הסתיים ומה להעביר הלאה. הבעיה: Claude עלול לדלג על שלבים, לשנות את הסדר, או "להחליט" שהמחקר מספק אחרי מקור אחד. הפתרון: ה-orchestrator צריך להיות קוד דטרמיניסטי. Claude מריץ כל שלב, אבל הקוד שלכם מחליט מתי להתקדם.
Skills כשלבי Pipeline
דרך אלגנטית לארגן pipeline ב-Claude Code: כל שלב הוא Skill (קובץ .md עם frontmatter). ה-Skill מגדיר כלים, מודל, effort level, ו-agent — וה-orchestrator מפעיל אותם ברצף. היתרון: שלבים ניתנים לשימוש חוזר, כל שלב הוא /slash-command שאפשר גם להפעיל בנפרד, ו-hot reload תומך בעדכון שלבים בלי לעצור את ה-pipeline.
# .claude/agents/pipeline-research.md (skill as pipeline stage)
---
name: pipeline-research
description: Research stage for content pipeline
model: claude-sonnet-4-6
tools:
- WebSearch
- WebFetch
- Write
---
# Research Stage
You are the Research Agent in a content pipeline.
## Input
Read the brief from `output/brief.json`.
## Task
1. Search for 5+ sources on the topic
2. Extract statistics, quotes, and examples
3. Focus on Israeli market when relevant
## Output
Write results to `output/research.json` with:
- sources[]: URL, title, key quotes
- statistics[]: claim, value, source
- key_findings[]: insight, relevance
ה-orchestrator קורא ל-skills ברצף באמצעות CLI mode: claude -p "/pipeline-research" --bare. הדגל --bare (מגרסה 2.1.81) מדלג על hooks, LSP, ו-plugin sync — מה שמאיץ את ההפעלה ב-pipeline אוטומטי. כל skill כותב output לקובץ מוסכם, ו-skill הבא קורא אותו. file-based data flow, פשוט ויעיל.
זמן: 25 דקות | תוצר: Pipeline orchestrator עובד עם 3 שלבים
- העתיקו את קוד ה-Orchestrator (Python או TypeScript) לפרויקט חדש
- צרו 3 stages פשוטים:
- Stage 1 — Brief: מקבל נושא, מייצר brief עם 3 נקודות מפתח (השתמשו ב-Agent SDK)
- Stage 2 — Draft: מקבל brief, מייצר טיוטת פסקה (300-500 מילים)
- Stage 3 — Review: מקבל טיוטה, מייצר ביקורת עם 3 שיפורים מוצעים
- הריצו את ה-pipeline עם נושא לבחירתכם
- וודאו שיש לוג מלא ו-checkpoint לכל שלב בתיקיית output
- נסו להריץ שוב ובדקו שהתוצאות שונות (non-determinism) אבל המבנה זהה
תוצאה צפויה: תיקיית output עם 3 checkpoint files, קובץ log מלא, וטיוטת תוכן שעברה סבב review.
תקשורת בין שלבים — Data Flow
שאלת מפתח: איך שלב A מעביר את התוצר שלו לשלב B? יש שלוש גישות, וכל אחת מתאימה למצב אחר.
גישה 1: File-Based — דרך קבצים
שלב A כותב output לקובץ (stage1_output.json), שלב B קורא אותו. זו הגישה הכי פשוטה ו-debuggable — אפשר לפתוח את הקובץ ולראות בדיוק מה שלב A ייצר. החיסרון: איטי יותר לנתונים גדולים בגלל I/O.
גישה 2: In-Memory — בזיכרון
שלב A מחזיר dict/object, ה-orchestrator מעביר אותו ישירות לשלב B. זו הגישה הכי מהירה — אין I/O, הכל בזיכרון. החיסרון: אם ה-pipeline קורס באמצע, הכל אבוד.
גישה 3: Hybrid — שילוב (מומלץ)
נתונים עוברים בין שלבים בזיכרון (מהיר), וגם נשמרים לדיסק כ-checkpoint אחרי כל שלב מוצלח (בטוח). זו הגישה שהשתמשנו בה ב-orchestrator למעלה — ה-data עובר ב-data = result["output"] (זיכרון) וגם נשמר ב-_save_checkpoint() (דיסק).
| אם... | אז... | הסיבה |
|---|---|---|
| דיבוג חשוב, הנתונים קטנים | File-Based | אפשר לראות כל שלב בנפרד |
| ביצועים קריטיים, ה-pipeline קצר | In-Memory | אין תקורת I/O |
| pipeline ארוך, קריסה עולה ביוקר | Hybrid (מומלץ) | מהירות + בטיחות + resume |
The Checkpoint Pattern — שמירת מצב
Checkpoint (נקודת שמירה) הוא הרעיון הכי חשוב ב-pipeline engineering. אחרי כל שלב מוצלח, שמרו את מצב ה-pipeline לדיסק. אם ה-pipeline קורס בשלב 6 מתוך 7, אתם יכולים להמשיך משלב 6 במקום להתחיל מחדש.
בלי checkpoints, pipeline בן 7 שלבים שקורס בשלב 6 — 100% מהעלות בזבוז. עם checkpoints — רק עלות שלב 6 בזבוז, ואתם ממשיכים ממקום שעצרתם.
# resume from checkpoint
def resume_pipeline(self, from_stage: int):
"""Load last checkpoint and resume from given stage"""
checkpoint_path = self.output_dir / f"checkpoint_{from_stage - 1}_*.json"
checkpoints = list(self.output_dir.glob(f"checkpoint_{from_stage - 1}_*.json"))
if not checkpoints:
raise ValueError(f"No checkpoint found for stage {from_stage - 1}")
data = json.loads(checkpoints[0].read_text())["data"]
# Run remaining stages
for i, stage in enumerate(self.stages[from_stage:], start=from_stage):
result = stage["execute"](data)
data = result["output"]
self._save_checkpoint(i, stage["name"], data)
return data
חשבו על ה-pipeline שעיצבתם קודם. כמה יעלה כל שלב בערך (בטוקנים ובכסף)? אם השלב האחרון נכשל, כמה עלות הולכת לפח בלי checkpoint? כתבו את המספרים — זה יעזור לכם להבין למה checkpoints הם חובה.
Data Validation — ולידציה בין שלבים
בכל נקודת חיבור בין שלבים, בדקו שהפלט תקין לפני שמעבירים אותו הלאה. זה כמו quality control בפס ייצור — מוצר פגום לא עובר לתחנה הבאה.
# data validation between stages
def validate_stage_output(stage_name: str, output: dict, schema: dict) -> bool:
"""Validate output matches expected schema before passing to next stage"""
required_fields = schema.get("required", [])
for field in required_fields:
if field not in output:
raise ValueError(
f"Stage '{stage_name}' output missing required field: '{field}'"
)
# Check minimum content length
if "min_length" in schema:
content = output.get("content", "")
if len(content) < schema["min_length"]:
raise ValueError(
f"Stage '{stage_name}' output too short: "
f"{len(content)} < {schema['min_length']}"
)
return True
שלב Research מחזיר JSON ריק בגלל שגיאת חיפוש. שלב Draft מקבל את ה-JSON הריק ומייצר מאמר "על סמך המחקר" — שהוא בפועל המצאה מלאה. שלב Edit מנסה לשפר מאמר שמבוסס על אוויר. בסוף מקבלים תוצר שנראה מושלם — אבל הכל בדוי. הפתרון: ולידציה בין כל שני שלבים. JSON תקין? מספיק שדות? תוכן לא ריק? בדקו לפני שממשיכים.
טיפול בשגיאות ושחזור
ב-pipeline אמיתי, שגיאות יקרו. לא אם, אלא מתי. API timeout, rate limit, פלט לא צפוי מ-Claude, שגיאת parsing. השאלה היא לא "איך למנוע שגיאות" אלא "מה לעשות כשהן קורות."
The Recovery Ladder — סולם ההתאוששות
כשמשהו נכשל, עולים בסולם עד שמוצאים פתרון:
- Retry Same — הריצו שוב את אותה קריאה. פותר שגיאות חולפות (timeout, rate limit). השתמשו ב-exponential backoff: המתנה של 1 שנייה, 2, 4, 8.
- Retry Different — שנו משהו בקריאה: פרומפט אחר, מודל אחר, פחות טוקנים. אם הגישה נכשלה, אולי צריך גישה אחרת.
- Skip Stage — דלגו על השלב אם הוא לא קריטי. שלב "Polish" שנכשל? ה-pipeline עדיין מייצר תוצר שמיש.
- Pause for Human — עצרו את ה-pipeline ושלחו התראה (Slack, email). בן אדם מחליט מה לעשות.
- Abort — עצרו הכל. שמרו checkpoint, כתבו לוג, צאו. לשלבים שנכשלים בהם זה מסוכן להמשיך (ולידציה, אבטחה).
Error Matrix — טבלת טיפול בשגיאות
לכל שלב, הגדירו מראש מה קורה בכל סוג שגיאה:
| שלב | Timeout/Rate Limit | פלט לא תקין | תוכן ריק | שגיאה לא צפויה |
|---|---|---|---|---|
| Research | Retry x3 (backoff) | Retry Different (שנה search terms) | Abort (אין מה להמשיך בלי מחקר) | Abort + Log |
| Draft | Retry x3 (backoff) | Retry Different (שנה prompt) | Retry Different | Abort + Log |
| Edit | Retry x3 (backoff) | Skip (הטיוטה עדיין שמישה) | Skip | Skip + Log |
| SEO | Retry x2 | Skip | Skip | Skip + Log |
| QA | Retry x3 | Pause for Human | Abort | Abort + Log |
צרו Error Matrix ל-pipeline שלכם. לכל שלב, כתבו מה לעשות ב-4 סוגי שגיאות: timeout, פלט לא תקין, תוכן ריק, שגיאה לא צפויה. הכלל: שלבים קריטיים → abort. שלבים אופציונליים → skip. שגיאות חולפות → retry.
Retry עם Exponential Backoff
# retry with exponential backoff
import asyncio
import random
async def retry_with_backoff(
func,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0
):
"""Retry a function with exponential backoff and jitter"""
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as e:
if attempt == max_retries:
raise # Last attempt, give up
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {total_delay:.1f}s...")
await asyncio.sleep(total_delay)
שימו לב ל-jitter (רעש אקראי) — זה מונע מצב שבו 10 pipelines שנכשלו באותו רגע גם עושים retry באותו רגע ויוצרים גל נוסף של rate limits.
Retry with Variation — לנסות אחרת
לפעמים retry same לא מספיק. אם הסוכן נכשל לא בגלל timeout אלא בגלל שהגישה שלו לא עבדה, צריך retry different: שנו את הפרומפט, החליפו מודל, או צמצמו את המשימה.
# retry with variation — different approach each attempt
async def retry_with_variation(stage_name: str, data: dict, variations: list):
"""Try different approaches for the same stage"""
for i, variation in enumerate(variations):
try:
print(f"Stage {stage_name}: Attempt {i+1} with variation '{variation['name']}'")
result = await variation["execute"](data)
return result
except Exception as e:
print(f"Variation '{variation['name']}' failed: {e}")
if i == len(variations) - 1:
raise # All variations exhausted
# Usage:
research_variations = [
{"name": "broad_search", "execute": lambda d: research_agent(d, terms="broad")},
{"name": "specific_search", "execute": lambda d: research_agent(d, terms="specific")},
{"name": "alternative_sources", "execute": lambda d: research_agent(d, sources="academic")}
]
result = await retry_with_variation("research", data, research_variations)
הדפוס הזה חזק במיוחד לשלבי Research ו-Draft — אם חיפוש רחב נכשל, נסו חיפוש ספציפי. אם סגנון כתיבה אחד לא עובד, נסו סגנון אחר. כל variation היא "גישה שונה" — לא חזרה על אותו דבר.
Pipeline בלי error handling שנכשל ומתחיל מחדש 3 פעמים — כפול 3 בעלויות. עם Opus 4.6 וטוקנים של $15/$75 ל-MTok input/output (נכון למרץ 2026), pipeline בן 7 שלבים שעולה 30-100 ₪ יכול להפוך ל-90-300 ₪ בזבוז. checkpoints + retry logic חוסכים את רוב הנזק. תחשבו על זה כפוליסת ביטוח — ההשקעה קטנה, החיסכון ענק.
גרסאות ו-Rollback
Pipeline משתנה לאורך זמן: משפרים פרומפטים, מחליפים מודלים, מוסיפים שלבים, מסירים שלבים. בלי ניהול גרסאות, אי אפשר לדעת איזו גרסה של ה-pipeline ייצרה איזו תוצאה. ויותר חשוב — אי אפשר לחזור לגרסה שעבדה כשמשהו מתקלקל.
שלוש רמות של Versioning
| רמה | מה מנהלים בגרסאות | איך |
|---|---|---|
| Pipeline Versioning | שינויים ב-orchestrator, שלבים, סדר | Git tags (v1.0, v1.1, v2.0) |
| Prompt Versioning | שינויים בפרומפטים של כל שלב | קבצים נפרדים: prompts/research-v1.md, research-v2.md |
| Output Versioning | תוצרים של כל ריצה | תיקיות timestamped: output/20260324_143000/ |
Prompt Versioning — הכי חשוב
פרומפטים משתנים הרבה יותר לעתים קרובות מקוד. הכלל: לעולם אל תערכו פרומפט in-place. תמיד צרו גרסה חדשה. זה מאפשר:
- השוואה: הריצו v1 ו-v2 של הפרומפט על אותו input, השוו תוצאות
- Rollback: אם v2 גרוע יותר — פשוט חזרו ל-v1
- A/B Testing: הריצו שתי גרסאות במקביל ובדקו מה מייצר תוצאה טובה יותר
# prompt versioning pattern
pipeline/
├── prompts/
│ ├── research/
│ │ ├── v1.md # הגרסה המקורית
│ │ ├── v2.md # שיפור — מקורות יותר ספציפיים
│ │ └── v3.md # ניסיון עם format שונה
│ ├── draft/
│ │ ├── v1.md
│ │ └── v2.md
│ └── edit/
│ └── v1.md
├── config.json # מגדיר איזו גרסת prompt כל שלב משתמש
└── orchestrator.py
# config.json — which prompt version each stage uses
{
"pipeline_version": "1.2.0",
"stages": {
"research": { "prompt": "prompts/research/v2.md", "model": "claude-sonnet-4-6" },
"draft": { "prompt": "prompts/draft/v1.md", "model": "claude-opus-4-6" },
"edit": { "prompt": "prompts/edit/v1.md", "model": "claude-sonnet-4-6" }
}
}
צרו תיקיית prompts/ בפרויקט ה-pipeline שלכם. לכל שלב, צרו תת-תיקייה עם קובץ v1.md שמכיל את הפרומפט הנוכחי. צרו config.json שמצביע על כל הגרסאות. מהרגע הזה, כשתשנו פרומפט — תמיד v2.md חדש.
Rollback Procedure
כשגרסה חדשה של ה-pipeline מייצרת תוצאות גרועות יותר:
- זהו את הגרסה האחרונה שעבדה (Git log + output comparison)
- חזרו ל-Git tag של אותה גרסה:
git checkout v1.1.0 - הריצו ריצת בדיקה כדי לוודא שהתוצאות חזרו לאיכות הצפויה
- חקרו למה הגרסה החדשה נכשלה — בדרך כלל שינוי פרומפט שנראה תמים
"רק שינוי קטן בפרומפט של שלב 3..." — ופתאום ה-pipeline מייצר תוצאות שונות לגמרי. בלי גרסה קודמת שמורה, אי אפשר לחזור. הפתרון: גרסאות פרומפט הן בחינם (קבצי טקסט). הרגל: כל שינוי = קובץ חדש. v1, v2, v3. לעולם לא מוחקים גרסה ישנה.
לוגים, מוניטורינג ועלויות
Pipeline בלי לוגים הוא כמו נהיגה בחושך. כשמשהו נכשל — ואיפה זה יקרה — אתם צריכים לדעת בדיוק מה קרה, באיזה שלב, כמה זה עלה, וכמה זמן לקח.
Structured Logging
Structured Logging (לוגים מובנים) אומר שכל שורת לוג היא JSON עם שדות קבועים, לא טקסט חופשי. זה מאפשר חיפוש, סינון, וניתוח אוטומטי.
# structured log entry — what every stage should log
{
"timestamp": "2026-03-24T14:30:00.000Z",
"run_id": "20260324_143000",
"pipeline": "content-pipeline",
"stage": "research",
"stage_index": 1,
"event": "stage_complete",
"duration_seconds": 45.2,
"tokens_input": 1250,
"tokens_output": 3800,
"cost_usd": 0.082,
"model": "claude-sonnet-4-6",
"success": true,
"output_summary": "Found 5 sources, 12 statistics, 8 key findings"
}
Log Levels
| Level | מה לכתוב | מתי |
|---|---|---|
| DEBUG | שיחת סוכן מלאה, כל tool call | פיתוח ודיבוג בלבד |
| INFO | התחלה/סיום שלב, החלטות מפתח, תוצאות | ריצה רגילה |
| WARN | שגיאות שנפתרו (retry הצליח), ביצועים חריגים | דברים שכדאי לשים לב אליהם |
| ERROR | כשלונות, שלבים שנדלגו, aborts | דברים שדורשים טיפול |
הוסיפו structured logging לשלב אחד ב-pipeline שלכם. לוגו: זמן התחלה, זמן סיום, עלות בטוקנים, עלות בדולר, האם הצליח, וסיכום פלט של שורה אחת. שמרו את הלוג כ-JSON ב-output directory.
Cost Tracking — מעקב עלויות
כסף. בואו נדבר על כסף. Pipelines הם הרכיב הכי יקר בעבודה עם Claude כי הם מבצעים קריאות API מרובות. צריך לעקוב אחרי עלויות ברמת שלב, ריצה, יום, ושבוע.
# cost tracking aggregation
class CostTracker:
def __init__(self):
self.costs = []
def add(self, stage: str, cost: float, tokens: int):
self.costs.append({
"stage": stage,
"cost": cost,
"tokens": tokens,
"timestamp": datetime.now().isoformat()
})
def summary(self) -> dict:
total = sum(c["cost"] for c in self.costs)
by_stage = {}
for c in self.costs:
stage = c["stage"]
by_stage[stage] = by_stage.get(stage, 0) + c["cost"]
# Find most expensive stage
most_expensive = max(by_stage, key=by_stage.get) if by_stage else None
return {
"total_cost": round(total, 4),
"by_stage": by_stage,
"most_expensive_stage": most_expensive,
"recommendation": (
f"Optimize '{most_expensive}' — "
f"it's {by_stage[most_expensive]/total*100:.0f}% of total cost"
) if most_expensive else "No data"
}
Pipeline Monitoring עם כלי Claude Code
Claude Code עצמו מספק כלים שמתאימים למעקב אחרי pipelines:
- Statusline: שורת הסטטוס מציגה עלות, טוקנים, מודל, ו-latency בזמן אמת. מגרסה 2.1.80 נוספו שדות rate-limit (חלון 5 שעות ו-7 ימים) — מידע קריטי כשמריצים pipelines בנפח גבוה
- Remote Control (פברואר 2026): מעקב וניווט session מהטלפון או מדפדפן. כשה-pipeline רץ בשרת — אפשר לעקוב אחרי ההתקדמות מכל מקום
- /loop (מרץ 2026): הרצת משימות חוזרות — למשל
/loop 5m check pipeline status. מתאים ל-health check אוטומטי בתוך session פעיל - Channels (מרץ 2026, research preview): שליחת הודעות מ-Telegram או Discord לתוך session Claude Code. אפשר לקבל notifications על pipeline status ישירות ל-Telegram
Alerting — התראות
הגדירו התראות על:
- כשל pipeline: Pipeline נכשל ולא הצליח להתאושש
- עלות חריגה: ריצה אחת עלתה יותר מ-threshold (למשל $5)
- זמן חריג: Pipeline לקח יותר מ-2x מהממוצע
- איכות נמוכה: שלב QA דיווח על בעיות שלא טופלו
ביצוע מקבילי של שלבים
לא כל שלב ב-pipeline חייב לחכות לקודמו. לפעמים יש שלבים עצמאיים שאפשר להריץ במקביל. הטריק: לזהות אילו שלבים באמת עצמאיים ואילו רק נראים עצמאיים.
דוגמה: אחרי שלב Research, שלבי "Outline" ו-"Image Generation" יכולים לרוץ במקביל — כי Outline צריך את תוצאות המחקר, ו-Image Generation גם צריך את תוצאות המחקר, אבל הם לא צריכים אחד את השני.
# parallel stages in Python
import asyncio
async def run_parallel_stages(data: dict, stages: list) -> dict:
"""Run multiple stages in parallel and merge their outputs"""
tasks = [stage["execute"](data) for stage in stages]
results = await asyncio.gather(*tasks, return_exceptions=True)
merged_output = {}
for stage, result in zip(stages, results):
if isinstance(result, Exception):
print(f"Stage {stage['name']} failed: {result}")
if stage.get("on_failure") != "skip":
raise result
continue
merged_output[stage["name"]] = result["output"]
return merged_output
// parallel stages in TypeScript
async function runParallelStages(
data: Record<string, any>,
stages: StageConfig[]
): Promise<Record<string, any>> {
const results = await Promise.allSettled(
stages.map(stage => stage.execute(data))
);
const merged: Record<string, any> = {};
for (let i = 0; i < stages.length; i++) {
const result = results[i];
if (result.status === 'fulfilled') {
merged[stages[i].name] = result.value.output;
} else {
console.error(`Stage ${stages[i].name} failed:`, result.reason);
if (stages[i].onFailure !== 'skip') throw result.reason;
}
}
return merged;
}
| שאלה | אם כן | אם לא |
|---|---|---|
| שלב B צריך את הפלט של שלב A? | סדרתי | אפשר מקביל |
| שלב B כותב לאותו קובץ כמו שלב A? | סדרתי (מניעת קונפליקט) | אפשר מקביל |
| שני השלבים יחד חורגים מ-rate limit? | סדרתי (או עם throttling) | מקביל |
הסתכלו על ה-pipeline שלכם. האם יש שני שלבים שמקבלים את אותו input ולא תלויים זה בזה? אם כן — סמנו אותם כמועמדים לביצוע מקבילי. חשבו: כמה זמן זה יחסוך?
ביצוע מקבילי מכפיל את קריאות ה-API. אם יש לכם rate limit של 60 requests/minute ו-pipeline בן 7 שלבים, ריצה סדרתית צריכה 7 requests. ריצה עם 3 שלבים מקבילים צריכה 5 requests — אבל 3 מהם נשלחים באותו רגע. אם ה-rate limit שלכם הוא per-second ולא per-minute, שלושה requests בו-זמנית יכולים לגרום ל-429 errors ו-backoff אגרסיבי שמאט את כל ה-pipeline. הפתרון: תכננו את המקביליות בהתאם ל-rate limits שלכם, לא רק ל-CPU. בדקו את ה-statusline של Claude Code — מגרסה 2.1.80 היא מציגה rate-limit windows (5 שעות ו-7 ימים) בזמן אמת.
זמן: 20 דקות | תוצר: Pipeline עם לפחות שלב מקבילי אחד
- קחו את ה-pipeline מהתרגיל הקודם (Brief → Draft → Review)
- הוסיפו שלב "Keywords" שרץ במקביל לשלב Draft — שניהם מקבלים את ה-Brief כ-input
- שלב Keywords מייצר רשימה של 10 מילות מפתח שקשורות לנושא
- עדכנו את ה-orchestrator להריץ Draft ו-Keywords במקביל (asyncio.gather / Promise.all)
- שלב Review מקבל את שניהם: את הטיוטה ואת מילות המפתח
- בדקו שהלוג מראה ששני השלבים רצו במקביל (timestamps חופפים)
תוצאה צפויה: Pipeline מהיר יותר שמייצר טיוטה ומילות מפתח בו-זמנית.
דוגמה מייצגת: ה-Pipeline בן 7 השלבים
זו לא דוגמה תיאורטית. זה pipeline אמיתי, שפועל בפרודקשן, ויצר את הפרקים של nVision Academy שאתם קוראים עכשיו. 87 פרקים, למעלה מ-600,000 מילים, נכתבו באמצעות pipeline אוטומטי מרובה שלבים. בואו ננתח אותו.
7 השלבים
| שלב | שם | תפקיד | מודל | כלים |
|---|---|---|---|---|
| 1 | Brief Agent | מקבל נושא → מייצר brief מובנה עם מטרות, קהל יעד, טון, ונקודות מפתח | Sonnet 4.6 | Read, Write |
| 2 | Research Agent | מחפש באינטרנט נתונים עדכניים, סטטיסטיקות, דוגמאות, ודעות מומחים | Sonnet 4.6 | WebSearch, WebFetch, Write |
| 3 | Outline Agent | יוצר outline מפורט מה-brief + מחקר — כותרות, תת-נושאים, נקודות | Sonnet 4.6 | Read, Write |
| 4 | Writing Agent | כותב את הפרק המלא לפי ה-outline, שומר על קול אחיד ועומק (5K-10K מילים) | Opus 4.6 | Read, Write, WebFetch |
| 5 | Editor Agent | בודק בהירות, דיוק, שלמות, דקדוק, עקביות טון, ואיכות עברית | Opus 4.6 | Read, Edit |
| 6 | SEO/Format Agent | אופטימיזציית כותרות, תגיות, פורמט HTML, עקביות CSS classes | Sonnet 4.6 | Read, Edit, Grep |
| 7 | QA Agent | בדיקה סופית: קישורים, עובדות, שלמות כל הסעיפים, אין placeholder text | Sonnet 4.6 | Read, Grep, Bash |
ביצועים
- זמן לפרק: 15-25 דקות ל-5,000-10,000 מילים
- עלות לפרק: $3-8 (תלוי באורך ובמספר חיפושי WebSearch)
- שלב יקר ביותר: Writing Agent (שלב 4) — ~50% מהעלות הכוללת (Opus + output ארוך)
- שלב מהיר ביותר: Brief Agent (שלב 1) — ~30 שניות
- שיעור הצלחה: ~95% ריצות מסתיימות בהצלחה בלי התערבות אנושית
- סוכנים: 8 סוכנים מוגדרים ב-
.claude/agents/, כל אחד עם כלים, מודל, והנחיות ספציפיות
הסיבה שה-pipeline הזה עובד: כל שלב ממוקד במשימה אחת. ה-Research Agent לא צריך לכתוב — הוא רק מחפש. ה-Writing Agent לא צריך לחפש — הוא מקבל מחקר מוכן. ה-Editor Agent לא צריך לכתוב מחדש — הוא רק מתקן. הפוקוס הזה מייצר איכות גבוהה יותר מ-"תכתוב מאמר מלא מאפס" בפרומפט אחד.
איך השלבים מתחברים — זרימת הנתונים
בואו נעקוב אחרי ריצה אחת של ה-pipeline מתחילתה ועד סופה:
- Input: "פרק 6: Building Production Pipelines, קורס CC Production & SDK, פרופיל Skill-Building"
- Brief Agent (30 שניות): מייצר
brief.jsonעם מטרות למידה, קהל יעד (מפתחים), טון (מנטורי), 12 נקודות מפתח - Research Agent (3-5 דקות): מחפש ב-WebSearch מאמרים על AI pipelines, מוצא סטטיסטיקות, דוגמאות. מייצר
research.jsonעם 8 מקורות ו-15 עובדות - Outline Agent (1-2 דקות): מקבל brief + research, מייצר outline מפורט עם 11 סקציות, כותרות H2 ו-H3, ונקודות תבליט
- Writing Agent (8-12 דקות): זה השלב הכבד — Opus 4.6 כותב את הפרק המלא, 5K-10K מילים, עם כל 15 אלמנטי Gold Standard. מייצר HTML
- Editor Agent (3-5 דקות): קורא את ה-HTML, מתקן עברית, בודק שכל CSS class קיים, מוסיף סימני פיסוק חסרים
- SEO/Format Agent (1-2 דקות): מוודא עקביות כותרות, תגיות section-tags על כל H2, ניקוי HTML
- QA Agent (1-2 דקות): בדיקה סופית — אין placeholder text, כל הסקציות קיימות, אין HTML שבור
סה"כ: ~20 דקות, ~$5. התוצר: פרק מלא, ערוך, מפורמט, ומוכן לפרסום. ותשוו את זה לכתיבה ידנית שלוקחת 8-15 שעות.
איך Claude Code מאיץ את ה-Pipeline
ה-pipeline הזה לא רץ "סתם" — הוא ממנף תכונות ספציפיות של Claude Code:
- Agent files (
.claude/agents/): כל שלב מוגדר כקובץ agent נפרד עם model, tools, ו-instructions ספציפיים. שינוי שלב = עריכת קובץ אחד - Checkpoint System: מערכת ה-checkpoints של Claude Code (מגרסה 2.0) שומרת snapshots — אם שלב Writing קורס אחרי 8 דקות, אפשר לחזור לנקודה שלפני באמצעות
/rewind - Context isolation: כל שלב רץ כ-subagent עם context window משלו — שלב Writing לא "רואה" את כל שיחת ה-Research, הוא מקבל רק את ה-output המסונתז. זה מפנה 100% מה-context ל-task הנוכחי
- Headless mode (
-p): שלבים שלא צריכים אינטראקציה רצים עםclaude -p "..."— בלי ממשק אינטראקטיבי, מהיר יותר, ואפשר לתזמן עם cron --bareflag: לשלבים פשוטים (Brief, SEO), הדגל מדלג על hooks ו-plugins ומאיץ startup
פתחו טרמינל והריצו: claude -p "Summarize this file in 3 bullets: [path to a file]". שימו לב לזמן — זו קריאה אחת של pipeline stage ב-headless mode. עכשיו הריצו את אותו דבר עם --bare. ראו כמה זמן נחסך ב-startup. זה ההבדל כש-pipeline רץ את זה 7 פעמים.
למה Pipeline עולה על Single Prompt?
| היבט | Single Prompt | Pipeline בן 7 שלבים |
|---|---|---|
| Focus | הסוכן צריך לעשות הכל בבת אחת | כל שלב ממוקד במשימה אחת |
| Context | Context מלא = פחות מקום לתוכן | כל שלב מתחיל עם context נקי |
| Error Recovery | שגיאה = התחלה מחדש של הכל | שגיאה = חזרה על שלב אחד |
| Quality | סביר עבור טקסט קצר | גבוה ועקבי עבור תוכן ארוך |
| Debugging | "משהו לא טוב" — אבל מה? | "שלב 3 ייצר outline חלש" — ברור |
בחרו דומיין שלכם (פיתוח, שיווק, תוכן, דאטה). כתבו pipeline בן 5-7 שלבים שמתאים לדומיין שלכם, בפורמט: שם שלב → input → output → מודל. השתמשו ב-case study הזה כהשראה, לא כעותק.
Pipeline Patterns לתחומים שונים
לכל תחום יש מבנה pipeline טבעי משלו. הנה חמישה דפוסים עם הסבר מתי כל אחד מתאים.
1. Code Generation Pipeline
Requirements → Design → Implementation → Testing → Review → Documentation → Deploy
מתי: כשצריך לייצר feature שלם מ-requirements. שלב Testing הוא quality gate — אם טסטים נכשלים, ה-pipeline חוזר לשלב Implementation (retry different). שלב Review משתמש ב-Opus לביקורת קוד מעמיקה.
2. Data Processing Pipeline
Ingest → Validate → Clean → Transform → Analyze → Report → Archive
מתי: כשמעבדים נתונים שמגיעים מבחוץ. שלב Validate הוא quality gate קריטי — נתונים לא תקינים = abort. שלבי Clean ו-Transform יכולים לרוץ עם Sonnet (זול יותר) כי המשימה מכנית.
3. Customer Support Pipeline
Receive → Classify → Research → Draft Response → Review → Send → Follow-up
מתי: כשמטפלים בפניות לקוחות. שלב Classify מחליט אם הפנייה טכנית, billing, או כללית — וזה משנה את הנתיב (routing). שלב Review חובה לפני שליחה — לעולם אל תשלחו תשובה ללקוח בלי review.
4. DevOps Pipeline
Monitor → Detect → Diagnose → Fix → Test → Deploy → Verify
מתי: כש-monitoring מזהה anomaly. שלב Detect כולל סף (threshold) — לא כל anomaly שווה תגובה. שלב Fix חייב להיות זהיר — ב-production, fix שגוי גרוע יותר מאי-עשייה.
5. Content Marketing Pipeline
Ideation → Research → Draft → Edit → SEO → Publish → Promote
מתי: כשמייצרים תוכן שיווקי. שלב Ideation יכול לרוץ ב-batch — לייצר 20 רעיונות, בן אדם בוחר, Pipeline ממשיך עם הנבחר. שלבי Edit ו-SEO יכולים לרוץ במקביל.
סטארטאפ ישראלי שמפתח מוצר SaaS יכול להשתמש ב-Code Generation Pipeline לפיצ'רים חדשים, ב-Customer Support Pipeline לטיפול בפניות (בעברית ובאנגלית — שלב Classify מזהה שפה ו-routing מתאים), וב-Content Marketing Pipeline לבלוג ולרשתות חברתיות. שלושה pipelines, שלושה תחומים, אותו orchestrator בסיסי. חברת fintech ישראלית יכולה להוסיף שלב compliance שבודק התאמה לרגולציה של רשות שוק ההון — pipeline stage ספציפי לשוק המקומי שאין לו מקבילה ב-templates הגנריים.
חברות טכנולוגיה ישראליות שמשתמשות ב-AI pipelines בפרודקשן מדווחות על חיסכון של 60-80% בזמן עבודה ידנית על תהליכים חוזרים כמו code review, תיעוד, ועיבוד נתונים. היתרון הייחודי של Pipeline מול קריאה בודדת: ב-pipeline אפשר לשלב שלב של לוקליזציה לעברית שבודק RTL, ניקוד, ומינוח ישראלי — שלב שנכשל ב-single prompt כי אין מספיק context לעשות הכל בבת אחת.
The Universal Pipeline Template
כל pipeline שונה, אבל יש תבנית אוניברסלית שמתאימה כנקודת התחלה:
Input → Validate → Process (1-3 stages) → Review → Output → Archive
התחילו מ-3 שלבים: Process → Review → Output. הוסיפו שלבים רק כשהאיכות דורשת את זה. Pipeline עם 12 שלבים שרוב השלבים לא מוסיפים ערך — הוא בזבוז זמן, כסף, ומורכבות. הכלל: כל שלב שמוסיפים חייב לענות על השאלה "מה התוצר יפסיד בלי השלב הזה?" אם אין תשובה ברורה — לא מוסיפים.
מפתחים רבים מתלהבים מ-pipelines ובונים 12 שלבים מהיום הראשון. התוצאה: pipeline שלוקח 45 דקות ועולה $15 לריצה, כש-pipeline בן 5 שלבים היה מייצר תוצאה כמעט זהה ב-15 דקות ו-$5. הכלל: התחילו עם 3-5 שלבים. הוסיפו שלב רק כשיש הוכחה שהתוצר משתפר. מדדו לפני ואחרי.
זמן: 25 דקות | תוצר: מסמך Pipeline Design מלא
- בחרו דומיין שרלוונטי לעבודה שלכם (אם לא בטוחים — בחרו Content Marketing)
- הגדירו את הפלט הסופי — מה ה-pipeline מייצר?
- עבדו אחורה ורשמו 5-7 שלבים
- לכל שלב, מלאו Stage Specification: Name, Input, Agent Config, Output, Success Criteria, Failure Action
- זהו שלבים שאפשר להריץ במקביל
- צרו Error Matrix — מה קורה בכל סוג שגיאה
- הוסיפו אומדן עלות ל-run אחד
תוצאה צפויה: מסמך Pipeline Design מלא שאפשר לבנות ממנו orchestrator.
מבין 5 ה-patterns שראינו, בחרו את הקרוב ביותר לעבודה שלכם. רשמו: (1) איזה pattern? (2) כמה שלבים הייתם צריכים? (3) איזה שלב הוא הכי קריטי (שם ה-abort הכי חשוב)?
Scaling — הרחבת Pipelines
Pipeline אחד שעובד — מצוין. עכשיו רוצים להריץ 10 pipelines במקביל. או 100. זה דורש חשיבה שונה.
Horizontal Scaling — הרחבה אופקית
במקום pipeline אחד שרץ מהר יותר, מריצים כמה pipelines במקביל. לדוגמה: 10 מאמרים שרצים ב-10 instances של אותו pipeline בו-זמנית. הצוואר בקבוק הוא לא ה-CPU — הוא ה-API rate limits.
Queue-Based Architecture
ארכיטקטורת תור: במקום להריץ pipelines ישירות, שולחים inputs לתור (Redis, RabbitMQ, Amazon SQS). Workers שולפים מהתור ומריצים pipelines. יתרונות:
- Decoupling: מי ששולח input לא צריך לחכות לתוצאה
- Reliability: אם worker קורס, ה-message חוזר לתור
- Scalability: רוצים להריץ מהר יותר? הוסיפו workers
Cost Budgeting — תקצוב עלויות
כשמריצים pipelines ב-scale, עלויות יכולות לברוח. הגדירו:
| מגבלה | דוגמה | פעולה כשחורגים |
|---|---|---|
| Per-run limit | $10 לריצה | Abort pipeline + alert |
| Daily limit | $100 ליום | Pause queue + email |
| Monthly budget | $2,000 לחודש | Shutdown + management alert |
מפתחים מחשבים "pipeline עולה $5 לריצה, 20 ריצות ביום = $100 ליום." אבל שוכחים retries. אם שיעור כשל של 10% עם ממוצע 2 retries לכשל, העלות האמיתית היא $110-115 ליום — עלייה של 15%. עם Opus 4.6 שהוא המודל היקר (input: ~$15/MTok, output: ~$75/MTok לפי אומדן), retry אחד של שלב writing יכול לעלות $2-4. הפתרון: תמיד הוסיפו 20% buffer לתקציב ה-pipeline עבור retries, timeouts, ו-debugging. תעקבו אחרי retry rate כמטריקה — אם הוא עולה, יש בעיה בפרומפט או ב-API.
The Backpressure Pattern
Backpressure (לחץ לאחור) הוא מנגנון שמאט את קצב הקלט כשהמערכת לא מספיקה לעבד. אם 100 items מחכים בתור אבל ה-workers מספיקים לעבד רק 10 בשעה — אל תוסיפו עוד items לתור. חכו שהתור יתרוקן, או הוסיפו workers.
# simple backpressure implementation
class PipelineQueue:
def __init__(self, max_queue_size: int = 50, max_concurrent: int = 5):
self.max_queue_size = max_queue_size
self.max_concurrent = max_concurrent
self.queue = []
self.running = 0
def can_accept(self) -> bool:
"""Check if queue can accept new items"""
return len(self.queue) < self.max_queue_size
async def submit(self, input_data: dict) -> bool:
if not self.can_accept():
print(f"Queue full ({len(self.queue)}/{self.max_queue_size}). "
f"Rejecting input. Try again later.")
return False
self.queue.append(input_data)
await self._process_if_capacity()
return True
async def _process_if_capacity(self):
while self.queue and self.running < self.max_concurrent:
item = self.queue.pop(0)
self.running += 1
# Run pipeline in background
asyncio.create_task(self._run_and_release(item))
async def _run_and_release(self, input_data):
try:
await self.pipeline.run(input_data)
finally:
self.running -= 1
await self._process_if_capacity()
Monitoring at Scale
כשמריצים עשרות ומאות pipelines, אתם צריכים dashboard מרכזי שמציג: כמה pipelines רצים עכשיו, כמה מחכים בתור, מה שיעור ההצלחה, מה העלות המצטברת. אפשר לבנות dashboard פשוט ב-HTML שקורא מקבצי הלוג — או להשתמש בכלים כמו Grafana עם JSON logs.
חשבו: אם הייתם צריכים להריץ את ה-pipeline שלכם על 100 items, כמה זמן זה היה לוקח סדרתית? כמה זמן עם 5 workers במקביל? מה rate limit ה-API שלכם מאפשר? כתבו את 3 המספרים האלה — הם מגדירים את תקרת ה-scaling שלכם.
סטארטאפ ישראלי שמריץ pipeline לעיבוד 500 כרטיסי support ביום: בעלות ממוצעת של 2 ₪ לכרטיס, זה 1,000 ₪ ליום, ~22,000 ₪ לחודש, ~$6,000. לא מעט. הגדרת cost budget וניטור יומי הם חובה — לא מותרות. טיפ: התחילו עם 50 כרטיסים ביום, מדדו עלות אמיתית, ואז הגדילו בהדרגה.
זמן: 30 דקות | תוצר: Production Pipeline מלא
- בחרו אחד: Content Pipeline, Code Review Pipeline, או Data Processing Pipeline
- בנו orchestrator עם לפחות 4 שלבים (השתמשו בקוד מהפרק כ-base)
- כל שלב משתמש ב-Agent SDK (Python או TypeScript) עם agent configuration ספציפי
- הוסיפו checkpoint אחרי כל שלב + resume function
- הוסיפו Error Matrix עם לפחות retry ו-abort
- הוסיפו structured logging — JSON log לכל שלב
- הוסיפו cost tracking שמסכם עלות כוללת בסוף
- הריצו את ה-pipeline על input אמיתי ובדקו את ה-output, logs, ו-checkpoints
תוצאה צפויה: תיקיית output עם: תוצר סופי, checkpoint files, structured log, ודוח עלות.
בנוסף לשגרה מפרקים קודמים (ניהול סוכנים, צוותים):
| תדירות | משימה | זמן |
|---|---|---|
| יומי | בדקו את הלוגים של pipelines שרצו — שגיאות, עלויות חריגות, זמנים ארוכים | 5 דקות |
| יומי | וודאו שעלות יומית לא חרגה מהתקציב שהגדרתם | 2 דקות |
| שבועי | השוו תוצרים מריצות שונות — האם האיכות עקבית? | 15 דקות |
| שבועי | בדקו cost breakdown לפי שלב — מהו השלב הכי יקר? אפשר לאופטמז? | 10 דקות |
| שבועי | בדקו אם יש פרומפטים שצריכים update (מודל חדש, API שהשתנה) | 10 דקות |
| חודשי | עשו A/B test על פרומפט חדש מול פרומפט ישן בשלב אחד | 30 דקות |
| חודשי | בדקו אם שלבים חדשים נדרשים או שאפשר לאחד שלבים קיימים | 20 דקות |
| חודשי | עדכנו version tag ו-changelog של ה-pipeline | 10 דקות |
בנו pipeline בן 3 שלבים: Brief → Draft → Review. השתמשו ב-Orchestrator מהפרק (Python או TypeScript), צרו 3 stages שכל אחד מריץ Agent SDK, הוסיפו checkpoint אחרי כל שלב. 30 דקות עבודה. מהרגע הזה יש לכם תשתית pipeline שאפשר להרחיב ל-5 שלבים, 7, 10 — פשוט מוסיפים stages. כל שיפור נוסף — error matrix, parallel stages, cost tracking, versioning — הוא בונוס שנבנה על הבסיס הזה.
ענו על לפחות 4 מתוך 5 כדי לעבור:
- למה ה-Orchestrator צריך להיות קוד דטרמיניסטי ולא סוכן AI? מה הסיכון בלתת ל-Claude לנהל את ה-flow?
(רמז: דילוג על שלבים, שינוי סדר, החלטות לא צפויות, עלות טוקנים על flow control) - מה ההבדל בין Pipeline ל-Team, ואיך מחליטים מה להשתמש?
(רמז: תלות בין שלבים — סדרתי vs. מקבילי, פלט של אחד הוא קלט של השני) - למה checkpoint חשוב, וכמה כסף pipeline בן 7 שלבים בלי checkpoints יכול לבזבז?
(רמז: קריסה בשלב 6 = 100% בזבוז בלי checkpoints, ~17% בזבוז עם checkpoints) - למה לא לערוך פרומפטים in-place? מה היתרון של prompt versioning?
(רמז: rollback, A/B testing, השוואת תוצאות, אי אפשר לחזור בלי גרסה קודמת) - מה ה-"Recovery Ladder" ובאיזה סדר עולים עליו? מתי skip ומתי abort?
(רמז: retry same → retry different → skip → human → abort. שלב אופציונלי = skip, שלב קריטי = abort)
בפרק הזה עברנו מסוכנים בודדים וצוותים ל-צינורות עבודה אוטומטיים — המבנה שהכי מתאים לתהליכים סדרתיים שבהם כל שלב בונה על הקודם. התובנה המרכזית: pipeline טוב הוא לא רק "שרשרת פרומפטים" — הוא מערכת הנדסית עם orchestrator דטרמיניסטי, checkpoints ששומרים מצב, error handling עם recovery ladder, prompt versioning שמאפשר rollback, ו-structured logging שמאפשר ניתוח עלויות ודיבוג. ראינו pipeline אמיתי בן 7 שלבים שמייצר תוכן ברמה גבוהה ב-15-25 דקות, ולמדנו שהפוקוס של כל שלב במשימה אחת הוא מה שמייצר את האיכות. בפרק הבא נעבור ל-Docker, שרתים מרוחקים, ופריסה ארגונית — ניקח את ה-Pipelines האלה ונריץ אותם בסביבות production אמיתיות, עם isolation, אבטחה, ו-scaling.
- ☐ מיפיתי תהליך עבודה חוזר שלי כ-pipeline של שלבים
- ☐ כתבתי Stage Specification לכל שלב ב-pipeline שלי (Name, Input, Output, Success Criteria, Failure Action)
- ☐ בניתי Orchestrator עובד (Python או TypeScript) שמריץ שלבים בסדר
- ☐ הוספתי Checkpoint אחרי כל שלב מוצלח — עם יכולת Resume
- ☐ יצרתי Error Matrix שמגדיר retry/skip/abort לכל שלב וסוג שגיאה
- ☐ הוספתי retry עם exponential backoff לשלבים עם שגיאות חולפות
- ☐ הוספתי data validation בין שלבים — בדיקה שה-output תקין לפני שעובר הלאה
- ☐ הקמתי prompt versioning — תיקיית prompts/ עם גרסאות לכל שלב
- ☐ הוספתי structured logging ל-pipeline — JSON log עם זמן, עלות, טוקנים, תוצאה
- ☐ הוספתי cost tracking שמסכם עלות כוללת וזיהוי שלב יקר
- ☐ זיהיתי שלבים שאפשר להריץ במקביל ומימשתי לפחות אחד
- ☐ הרצתי pipeline מלא על input אמיתי ובדקתי output + logs + checkpoints
- ☐ כתבתי מסמך Pipeline Design לדומיין שלי עם 5-7 שלבים מתועדים
- ☐ הגדרתי cost budget — per-run limit ו-daily limit