tx

tx send / tx inbox

Channel-based agent-to-agent messaging

Purpose

tx send and tx inbox provide channel-based messaging for agent-to-agent communication. Messages are persistent, cursor-based, and support acknowledgment for reliable delivery.

Key properties:

  • Channel-based: Messages are organized into named channels (e.g., agent ID, topic, task:tx-abc123)
  • Cursor-based fan-out: Multiple consumers read from the same channel using afterId cursors
  • Read-only inbox: Reading messages has no side effects -- explicit tx ack is required
  • TTL support: Messages can auto-expire after a configurable duration
  • Correlation IDs: Request/reply patterns via correlationId matching

Send a Message

tx send <channel> <content> [options]

Arguments

ArgumentRequiredDescription
<channel>YesChannel name (e.g., agent-1, task:tx-abc123)
<content>YesMessage content

Options

OptionDescription
--sender <s>Sender name (default: cli)
--task <id>Associated task ID
--ttl <sec>Time-to-live in seconds
--correlation <id>Correlation ID for request/reply
--metadata '{}'JSON metadata string
--jsonOutput as JSON

Examples

# Send a simple message
tx send agent-1 "Task tx-abc123 is ready for review"

# Send with metadata and TTL
tx send builds "Deploy v2.3.1 complete" --sender ci --ttl 3600 --metadata '{"version":"2.3.1"}'

# Send with correlation for request/reply
tx send planner "Need architecture review" --correlation req-001

# JSON output
tx send agent-1 "hello" --json
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const message = await tx.messages.send({
  channel: 'agent-1',
  content: 'Task tx-abc123 is ready for review',
  sender: 'orchestrator',
  ttlSeconds: 3600
})
// Returns: { id, channel, sender, content, status, correlationId, taskId, metadata, createdAt, ackedAt, expiresAt }
{
  "tool": "tx_send",
  "arguments": {
    "channel": "agent-1",
    "content": "Task tx-abc123 is ready for review",
    "sender": "planner",
    "taskId": "tx-abc123",
    "ttlSeconds": 3600,
    "correlationId": "req-001",
    "metadata": "{\"priority\":\"high\"}"
  }
}

Parameters

ParameterRequiredDescription
channelYesChannel name
contentYesMessage content
senderNoSender name (default: mcp)
taskIdNoAssociated task ID
ttlSecondsNoTime-to-live in seconds
correlationIdNoCorrelation ID for request/reply
metadataNoJSON metadata string
POST /api/messages
Content-Type: application/json

{
  "channel": "agent-1",
  "content": "Task tx-abc123 is ready for review",
  "sender": "orchestrator",
  "taskId": "tx-abc123",
  "ttlSeconds": 3600,
  "correlationId": "req-001",
  "metadata": { "priority": "high" }
}

Response (201)

{
  "id": 1,
  "channel": "agent-1",
  "sender": "orchestrator",
  "content": "Task tx-abc123 is ready for review",
  "status": "pending",
  "correlationId": "req-001",
  "taskId": "tx-abc123",
  "metadata": { "priority": "high" },
  "createdAt": "2025-01-28T10:00:00.000Z",
  "ackedAt": null,
  "expiresAt": "2025-01-28T11:00:00.000Z"
}

Example

curl -X POST http://localhost:3456/api/messages \
  -H "Content-Type: application/json" \
  -d '{"channel": "agent-1", "content": "hello", "sender": "orchestrator"}'

Read Inbox

Read messages from a channel. This is a pure read with no side effects -- messages are not marked as read or consumed.

tx inbox <channel> [options]

Arguments

ArgumentRequiredDescription
<channel>YesChannel to read from

Options

OptionDescription
--after <id>Cursor: only messages with ID greater than this value
--limit <n>Max messages to return
--sender <s>Filter by sender
--correlation <id>Filter by correlation ID
--include-ackedInclude acknowledged messages
--jsonOutput as JSON

Examples

# Read all pending messages
tx inbox agent-1

# Cursor-based pagination
tx inbox agent-1 --after 42

# Filter by sender
tx inbox agent-1 --sender planner

# Include already-acknowledged messages
tx inbox agent-1 --include-acked

# JSON output for scripting
tx inbox agent-1 --json

Output

[1] 2025-01-28 10:00:00 orchestrator: Task tx-abc123 is ready for review
[2] 2025-01-28 10:05:00 ci: Build passed for branch feature/auth

2 message(s)
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const messages = await tx.messages.inbox('agent-1', { limit: 10, afterId: 42 })
// messages: Array<{ id, channel, sender, content, status, ... }>
{
  "tool": "tx_inbox",
  "arguments": {
    "channel": "agent-1",
    "afterId": 42,
    "limit": 10,
    "sender": "planner",
    "includeAcked": false
  }
}

Parameters

ParameterRequiredDescription
channelYesChannel to read from
afterIdNoCursor: only messages with ID > this value
limitNoMax messages (default: 50)
senderNoFilter by sender
correlationIdNoFilter by correlation ID
includeAckedNoInclude acknowledged messages
GET /api/messages/inbox/:channel

Query Parameters

ParameterDescription
afterIdCursor: only messages with ID > this value
limitMax messages to return
senderFilter by sender
correlationIdFilter by correlation ID
includeAckedInclude acknowledged messages (true/false)

Response (200)

{
  "messages": [
    {
      "id": 1,
      "channel": "agent-1",
      "sender": "orchestrator",
      "content": "Task tx-abc123 is ready for review",
      "status": "pending",
      "correlationId": null,
      "taskId": null,
      "metadata": {},
      "createdAt": "2025-01-28T10:00:00.000Z",
      "ackedAt": null,
      "expiresAt": null
    }
  ],
  "channel": "agent-1",
  "count": 1
}

Example

curl http://localhost:3456/api/messages/inbox/agent-1?limit=10&afterId=0

Acknowledge a Message

Mark a single message as acknowledged. This transitions its status from pending to acked.

tx ack <message-id> [options]

Arguments

ArgumentRequiredDescription
<message-id>YesNumeric message ID

Options

OptionDescription
--jsonOutput as JSON

Examples

tx ack 42
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const message = await tx.messages.ack(42)
{
  "tool": "tx_ack",
  "arguments": {
    "id": 42
  }
}
POST /api/messages/:id/ack

Response (200)

{
  "message": {
    "id": 42,
    "channel": "agent-1",
    "sender": "orchestrator",
    "content": "Task tx-abc123 is ready for review",
    "status": "acked",
    "correlationId": null,
    "taskId": null,
    "metadata": {},
    "createdAt": "2025-01-28T10:00:00.000Z",
    "ackedAt": "2025-01-28T10:15:00.000Z",
    "expiresAt": null
  }
}

Example

curl -X POST http://localhost:3456/api/messages/42/ack

Acknowledge All Messages

Acknowledge all pending messages on a channel at once.

tx ack:all <channel> [options]

Arguments

ArgumentRequiredDescription
<channel>YesChannel to acknowledge

Options

OptionDescription
--jsonOutput as JSON

Examples

tx ack:all agent-1
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const { channel, ackedCount } = await tx.messages.ackAll('agent-1')
{
  "tool": "tx_ack_all",
  "arguments": {
    "channel": "agent-1"
  }
}
POST /api/messages/inbox/:channel/ack

Response (200)

{
  "channel": "agent-1",
  "ackedCount": 5
}

Example

curl -X POST http://localhost:3456/api/messages/inbox/agent-1/ack

Pending Count

Count unacknowledged messages on a channel.

tx outbox:pending <channel> [options]

Options

OptionDescription
--jsonOutput as JSON

Examples

tx outbox:pending agent-1
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const count = await tx.messages.pending('agent-1')
{
  "tool": "tx_outbox_pending",
  "arguments": {
    "channel": "agent-1"
  }
}
GET /api/messages/inbox/:channel/count

Response (200)

{
  "channel": "agent-1",
  "count": 3
}

Example

curl http://localhost:3456/api/messages/inbox/agent-1/count

Garbage Collection

Clean up expired and old acknowledged messages.

tx outbox:gc [options]

Options

OptionDescription
--acked-older-than <hours>Remove acked messages older than N hours
--jsonOutput as JSON

Examples

# Remove expired + acked messages older than 24 hours
tx outbox:gc --acked-older-than 24
import { TxClient } from '@jamesaphoenix/tx-agent-sdk'

const tx = new TxClient({ apiUrl: 'http://localhost:3456' })

const { expired, acked } = await tx.messages.gc({ ackedOlderThanHours: 24 })

There is no dedicated MCP tool for garbage collection. Use the REST API or CLI.

POST /api/messages/gc
Content-Type: application/json

{
  "ackedOlderThanHours": 24
}

Response (200)

{
  "expired": 3,
  "acked": 12
}

Example

curl -X POST http://localhost:3456/api/messages/gc \
  -H "Content-Type: application/json" \
  -d '{"ackedOlderThanHours": 24}'

Message Schema

Every message contains:

FieldTypeDescription
idnumberAuto-incrementing message ID
channelstringChannel the message belongs to
senderstringWho sent the message
contentstringMessage body
status"pending" | "acked"Current status
correlationIdstring | nullFor request/reply patterns
taskIdstring | nullAssociated task ID
metadataRecord<string, unknown>Arbitrary key-value data
createdAtstringISO 8601 timestamp
ackedAtstring | nullWhen the message was acknowledged
expiresAtstring | nullWhen the message expires (from TTL)

Use Case: Agent Coordination

Fan-out with Cursors

Multiple agents can read from the same channel independently using cursor-based pagination. Each agent tracks its own afterId cursor:

# Agent 1 reads messages
CURSOR_1=0
MESSAGES=$(tx inbox shared-work --after $CURSOR_1 --json)
CURSOR_1=$(echo "$MESSAGES" | jq -r '.[-1].id // 0')

# Agent 2 reads the same messages independently
CURSOR_2=0
MESSAGES=$(tx inbox shared-work --after $CURSOR_2 --json)
CURSOR_2=$(echo "$MESSAGES" | jq -r '.[-1].id // 0')

Request/Reply Pattern

Use correlationId to match responses to requests:

# Requester sends
tx send reviewer "Please review PR #42" --correlation review-42

# Responder reads and replies
tx inbox reviewer --json | jq -r '.[] | select(.correlationId == "review-42")'
tx send requester "LGTM, approved" --correlation review-42

# Requester reads the reply
tx inbox requester --correlation review-42 --json

Task-Scoped Channels

Use task:<id> channel naming to scope messages to a task:

# Send updates about a specific task
tx send "task:tx-abc123" "Started implementation" --task tx-abc123
tx send "task:tx-abc123" "Tests passing, ready for review" --task tx-abc123

# Read task-specific discussion
tx inbox "task:tx-abc123"

Agent Loop with Inbox Polling

#!/bin/bash
AGENT_ID="worker-$$"
CURSOR=0

while true; do
  # Check for new messages
  MESSAGES=$(tx inbox "$AGENT_ID" --after $CURSOR --json)
  COUNT=$(echo "$MESSAGES" | jq length)

  if [ "$COUNT" -gt 0 ]; then
    # Update cursor
    CURSOR=$(echo "$MESSAGES" | jq -r '.[-1].id')

    # Process each message
    echo "$MESSAGES" | jq -c '.[]' | while read -r msg; do
      CONTENT=$(echo "$msg" | jq -r '.content')
      MSG_ID=$(echo "$msg" | jq -r '.id')

      # Handle the message
      echo "Processing: $CONTENT"

      # Acknowledge when done
      tx ack "$MSG_ID"
    done
  fi

  sleep 2
done

On this page