Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/automation/artifact_payloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub(super) fn traces_payload(ctx: &ArtifactPayloadContext<'_>) -> Value {
"response_schema": ctx.request.contract.response_schema,
"strict_json": ctx.request.contract.strict_json,
"evidence_hash": ctx.record.evidence_hash,
"evidence_mode": context_evidence_mode(&ctx.request.context),
"input_hash": ctx.record.input_hash,
"output_hash": ctx.record.output_hash,
"context_keys": ctx.request.context.as_object()
Expand All @@ -79,6 +80,22 @@ pub(super) fn traces_payload(ctx: &ArtifactPayloadContext<'_>) -> Value {
})
}

/// Extracts the `evidence_mode` label from whichever evidence object the task
/// placed in the request context (e.g. `session_reflection_evidence` or
/// `skill_writer_evidence`), so traces distinguish session-replay-backed runs
/// from grep-only runs. Null for tasks without a mode label.
fn context_evidence_mode(context: &Value) -> Value {
context
.as_object()
.and_then(|object| {
object
.values()
.find_map(|value| value.get("evidence_mode").filter(|mode| mode.is_string()))
})
.cloned()
.unwrap_or(Value::Null)
}

pub(super) fn feedback_payload(ctx: &ArtifactPayloadContext<'_>, trace_ref: &Value) -> Value {
json!({
"schema_version": 1,
Expand Down
212 changes: 205 additions & 7 deletions src/automation/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use crate::global_db::GlobalDb;
use crate::sessions::cursor::{
resolve_hermes_profile_session_db_readonly, HermesProfileDbReadOnly,
};
use crate::sessions::lcm::{LcmGrepRequest, LcmGrepSort, LcmScope};
use crate::sessions::lcm::{
LcmGrepRequest, LcmGrepSort, LcmScope, LcmSessionReplayRequest, LcmSessionReplaySlice,
};
use crate::tracedecay::{current_timestamp, TraceDecay};

pub use super::memory_curator::{
Expand All @@ -43,6 +45,14 @@ pub use super::memory_curator::{

const SKILL_ANALYTICS_IMPORT_LIMIT: usize = 2_000;

/// Bounds for the session-replay evidence channel. Worst case per session is
/// `(4 + 4) * 500 + 3 * 700 = 6_100` snippet chars, so the default three
/// sessions stay under ~5k tokens alongside the grep hits.
const SESSION_REPLAY_HEAD_TURNS: usize = 4;
const SESSION_REPLAY_TAIL_TURNS: usize = 4;
const SESSION_REPLAY_SNIPPET_CHARS: usize = 500;
const SESSION_REPLAY_SUMMARY_NODES: usize = 3;
const SESSION_REPLAY_SUMMARY_CHARS: usize = 700;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SessionReflectorAutomationOptions {
#[serde(default)]
Expand All @@ -65,6 +75,14 @@ pub struct SessionReflectorAutomationOptions {
pub include_summaries: bool,
#[serde(default = "default_session_evidence_limit")]
pub evidence_limit: usize,
/// When true, include bounded turn-ordered slices of recently active
/// sessions as a primary evidence channel alongside the keyword grep.
#[serde(default = "default_include_recent_sessions")]
pub include_recent_sessions: bool,
/// How many recently active sessions to replay when `session_id` is not
/// explicitly set.
#[serde(default = "default_recent_sessions_limit")]
pub recent_sessions_limit: usize,
#[serde(default = "default_lcm_grep_sort")]
pub sort: LcmGrepSort,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand All @@ -90,6 +108,8 @@ impl Default for SessionReflectorAutomationOptions {
session_id: None,
include_summaries: default_include_summaries(),
evidence_limit: default_session_evidence_limit(),
include_recent_sessions: default_include_recent_sessions(),
recent_sessions_limit: default_recent_sessions_limit(),
sort: default_lcm_grep_sort(),
source: None,
role: None,
Expand Down Expand Up @@ -124,6 +144,13 @@ pub struct SkillWriterAutomationOptions {
pub query: String,
#[serde(default = "default_skill_writer_evidence_limit")]
pub evidence_limit: usize,
/// When true, include bounded turn-ordered slices of recently active
/// sessions as a primary evidence channel alongside the keyword grep.
#[serde(default = "default_include_recent_sessions")]
pub include_recent_sessions: bool,
/// How many recently active sessions to replay.
#[serde(default = "default_recent_sessions_limit")]
pub recent_sessions_limit: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub profile_root: Option<PathBuf>,
}
Expand All @@ -138,6 +165,8 @@ impl Default for SkillWriterAutomationOptions {
provider: default_skill_writer_provider(),
query: default_skill_writer_query(),
evidence_limit: default_skill_writer_evidence_limit(),
include_recent_sessions: default_include_recent_sessions(),
recent_sessions_limit: default_recent_sessions_limit(),
profile_root: None,
}
}
Expand Down Expand Up @@ -684,7 +713,28 @@ async fn build_session_reflector_evidence(
.map_err(|e| TraceDecayError::Config {
message: format!("failed to build session reflection evidence: {e}"),
})?;
let recent_session_slices = if options.include_recent_sessions
&& session_reflector_replay_allowed(
options.scope,
session_id.as_deref(),
source.as_deref(),
role.as_deref(),
options.start_time,
options.end_time,
) {
recent_session_replay_evidence(
&lcm_db,
&provider,
session_id.as_deref(),
options.recent_sessions_limit,
"session_reflector",
)
.await?
} else {
None
};
let evidence = json!({
"evidence_mode": evidence_mode_label(recent_session_slices.is_some()),
"storage_scope": storage_scope,
"hermes_home": options.hermes_home.as_ref().map(|path| path.display().to_string()),
"provider": provider,
Expand All @@ -697,14 +747,19 @@ async fn build_session_reflector_evidence(
"role": role,
"start_time": options.start_time,
"end_time": options.end_time,
"recent_session_slices": recent_session_slices,
"hits": hits,
});
let evidence_hash = Some(sha256_json(&evidence));
if evidence
let has_grep_hits = evidence
.get("hits")
.and_then(Value::as_array)
.is_none_or(Vec::is_empty)
{
.is_some_and(|hits| !hits.is_empty());
let has_replay_sessions = evidence
.pointer("/recent_session_slices/sessions")
.and_then(Value::as_array)
.is_some_and(|sessions| !sessions.is_empty());
if !has_grep_hits && !has_replay_sessions {
return Ok(SessionReflectorEvidenceOutcome::Skipped {
reason: "no_session_evidence",
evidence_hash,
Expand Down Expand Up @@ -772,6 +827,18 @@ async fn build_skill_writer_evidence(
.map_err(|e| TraceDecayError::Config {
message: format!("failed to build skill writer evidence: {e}"),
})?;
let recent_session_slices = if options.include_recent_sessions {
recent_session_replay_evidence(
&lcm_db,
&provider,
None,
options.recent_sessions_limit,
"skill_writer",
)
.await?
} else {
None
};
let existing_skills = list_managed_skills(&profile_root).await?;
let global_db = GlobalDb::open().await;
ingest_project_analytics_events(
Expand Down Expand Up @@ -805,10 +872,12 @@ async fn build_skill_writer_evidence(
&underused_tool_families,
);
let evidence = json!({
"evidence_mode": evidence_mode_label(recent_session_slices.is_some()),
"storage_scope": storage_scope,
"hermes_home": options.hermes_home.as_ref().map(|path| path.display().to_string()),
"provider": provider,
"query": query,
"recent_session_slices": recent_session_slices,
"hits": hits,
"skill_usage_summaries": skill_usage_summaries,
"stale_recommendations": stale_recommendations,
Expand All @@ -834,11 +903,15 @@ async fn build_skill_writer_evidence(
.collect::<Vec<_>>(),
});
let evidence_hash = Some(sha256_json(&evidence));
if evidence
let has_grep_hits = evidence
.get("hits")
.and_then(Value::as_array)
.is_none_or(Vec::is_empty)
{
.is_some_and(|hits| !hits.is_empty());
let has_replay_sessions = evidence
.pointer("/recent_session_slices/sessions")
.and_then(Value::as_array)
.is_some_and(|sessions| !sessions.is_empty());
if !has_grep_hits && !has_replay_sessions {
return Ok(SkillWriterEvidenceOutcome::Skipped {
reason: "no_skill_writer_evidence",
evidence_hash,
Expand Down Expand Up @@ -1236,6 +1309,7 @@ fn build_combined_review_prompt(reflector_evidence: &Value, skill_evidence: &Val
fn build_session_reflector_prompt(evidence: &Value) -> String {
const POLICY: &str = concat!(
"Review these bounded TraceDecay session snippets and propose only durable memory facts.\n",
"Evidence has two channels: recent_session_slices holds turn-ordered head/tail turns and summary nodes replayed from recently active sessions, and hits holds keyword search matches; both are citable.\n",
"\n",
"Signals worth capturing (any one is enough):\n",
"- The user revealed durable preferences, persona, expectations, or ways they want the agent to operate.\n",
Expand All @@ -1262,6 +1336,7 @@ fn build_session_reflector_prompt(evidence: &Value) -> String {
fn build_skill_writer_prompt(evidence: &Value) -> String {
const POLICY: &str = concat!(
"Review these bounded TraceDecay session snippets and propose only reusable managed skills for repeated workflows, corrections, or tool-use patterns.\n",
"Evidence has two channels: recent_session_slices holds turn-ordered head/tail turns and summary nodes replayed from recently active sessions, and hits holds keyword search matches.\n",
"\n",
"Target shape of the skill library: CLASS-LEVEL umbrella skills, each with a rich body and support files for session-specific detail — not a long flat list of narrow one-session-one-skill entries. This shapes HOW you update, not WHETHER you update.\n",
"\n",
Expand Down Expand Up @@ -1302,6 +1377,121 @@ fn normalized_non_empty(value: &str) -> Option<String> {
}
}

struct ReplaySessionTarget {
provider: String,
session_id: String,
}

/// Builds the "recent completed sessions" replay evidence channel: bounded
/// turn-ordered head/tail slices plus top summary-DAG nodes for the last N
/// recently active sessions (or one explicitly requested session).
///
/// Returns `None` when no session has any raw messages, so callers can fall
/// back to grep-only evidence.
async fn recent_session_replay_evidence(
lcm_db: &GlobalDb,
provider: &str,
explicit_session_id: Option<&str>,
sessions_limit: usize,
task_name: &str,
) -> Result<Option<Value>> {
let sessions_limit = sessions_limit.clamp(1, 10);
let provider_filter = (provider != "all").then_some(provider);
let replay_error = |e: crate::sessions::lcm::LcmError| TraceDecayError::Config {
message: format!("failed to build {task_name} session replay evidence: {e}"),
};
let (session_selection, targets) = if let Some(session_id) = explicit_session_id {
let providers = match provider_filter {
Some(provider) => vec![provider.to_string()],
None => lcm_db
.lcm_session_providers(session_id)
.await
.map_err(replay_error)?,
};
let targets: Vec<ReplaySessionTarget> = providers
.into_iter()
.map(|provider| ReplaySessionTarget {
provider,
session_id: session_id.to_string(),
})
.collect();
("explicit_session_id", targets)
} else {
let targets: Vec<ReplaySessionTarget> = lcm_db
.lcm_recent_sessions(provider_filter, sessions_limit)
.await
.map_err(replay_error)?
.into_iter()
.map(|session| ReplaySessionTarget {
provider: session.provider,
session_id: session.session_id,
})
.collect();
("recent_activity", targets)
};

let mut sessions: Vec<LcmSessionReplaySlice> = Vec::new();
for target in targets {
let slice = lcm_db
.lcm_session_replay_slice(&LcmSessionReplayRequest {
provider: target.provider,
session_id: target.session_id,
head_limit: SESSION_REPLAY_HEAD_TURNS,
tail_limit: SESSION_REPLAY_TAIL_TURNS,
max_snippet_chars: SESSION_REPLAY_SNIPPET_CHARS,
summary_limit: SESSION_REPLAY_SUMMARY_NODES,
max_summary_chars: SESSION_REPLAY_SUMMARY_CHARS,
})
.await
.map_err(replay_error)?;
if slice.total_messages == 0 && slice.summary_nodes.is_empty() {
continue;
}
sessions.push(slice);
}
if sessions.is_empty() {
return Ok(None);
}
Ok(Some(json!({
"mode": "recent_sessions",
"session_selection": session_selection,
"sessions_limit": sessions_limit,
"bounds": {
"head_turns": SESSION_REPLAY_HEAD_TURNS,
"tail_turns": SESSION_REPLAY_TAIL_TURNS,
"snippet_chars": SESSION_REPLAY_SNIPPET_CHARS,
"summary_nodes": SESSION_REPLAY_SUMMARY_NODES,
"summary_chars": SESSION_REPLAY_SUMMARY_CHARS,
},
"sessions": sessions,
})))
}

/// Names the evidence channels actually present so run artifacts can
/// distinguish replay-backed runs from grep-only runs.
fn evidence_mode_label(has_replay: bool) -> &'static str {
if has_replay {
"session_replay_with_grep"
} else {
"grep_only"
}
}

fn session_reflector_replay_allowed(
scope: LcmScope,
session_id: Option<&str>,
source: Option<&str>,
role: Option<&str>,
start_time: Option<i64>,
end_time: Option<i64>,
) -> bool {
if source.is_some() || role.is_some() || start_time.is_some() || end_time.is_some() {
return false;
}

matches!(scope, LcmScope::All) || session_id.is_some()
}

/// Resolves the LCM sessions database for an automation task, reporting
/// `NotIngested` when the store does not exist yet so callers can skip
/// without re-checking the path.
Expand Down Expand Up @@ -1372,6 +1562,14 @@ fn default_session_evidence_limit() -> usize {
20
}

fn default_include_recent_sessions() -> bool {
true
}

fn default_recent_sessions_limit() -> usize {
3
}

fn default_skill_writer_query() -> String {
"workflow correction repeated skill tool pattern".to_string()
}
Expand Down
Loading
Loading