Python client for Amp - a database for blockchain data.
Features:
- Query Client: Issue Flight SQL queries to Amp servers
- Admin Client: Manage datasets, deployments, and jobs programmatically
- Registry Client: Discover, search, and publish datasets to the Registry
- Dataset Inspection: Explore dataset schemas with
inspect()anddescribe()methods - Data Loaders: Zero-copy loading into PostgreSQL, Redis, Snowflake, Delta Lake, Iceberg, and more
- Parallel Streaming: High-throughput parallel data ingestion with automatic resume
- Manifest Generation: Fluent API for creating and deploying datasets from SQL queries
- Auto-Refreshing Auth: Seamless authentication with automatic token refresh
- Rust
brew install rust
Ensure you have
uvinstalled locally.Install dependencies
uv build
Activate a virtual environment
Python 3.13 is the highest version supported
brew install [email protected]uv venv --python 3.13
fromampimportClient# Connect to Amp serverclient=Client(url="grpc://localhost:8815") # Execute query and convert to pandasdf=client.sql("SELECT * FROM eth.blocks LIMIT 10").to_arrow().to_pandas() print(df)fromampimportClient# Connect with admin capabilitiesclient=Client( query_url="grpc://localhost:8815", admin_url="http://localhost:8080", auth_token="your-token" ) # Register and deploy a datasetjob= ( client.sql("SELECT block_num, hash FROM eth.blocks") .with_dependency('eth', '_/[email protected]') .register_as('_', 'my_dataset', '1.0.0', 'blocks', 'mainnet') .deploy(parallelism=4, end_block='latest', wait=True) ) print(f"Deployment completed: {job.status}")# Load query results into PostgreSQLresult=client.sql("SELECT * FROM eth.blocks").load( connection='my_pg_connection', destination='eth_blocks' ) print(f"Loaded {result.rows_loaded} rows")The client supports three authentication methods (in priority order):
fromampimportClient# 1. Explicit token (highest priority)client=Client( url="grpc://localhost:8815", auth_token="your-token" ) # 2. Environment variable# export AMP_AUTH_TOKEN="your-token"client=Client(url="grpc://localhost:8815") # 3. Shared auth file (auto-refresh, recommended)# Uses ~/.amp/cache/amp_cli_auth (shared with TypeScript CLI)client=Client( url="grpc://localhost:8815", auth=True# Automatically refreshes expired tokens )fromampimportClient# Connect with registry supportclient=Client( query_url="grpc://localhost:8815", registry_url="https://api.registry.amp.staging.thegraph.com", auth=True ) # Search for datasetsresults=client.registry.datasets.search('ethereum blocks') fordatasetinresults.datasets[:5]: print(f"{dataset.namespace}/{dataset.name} - {dataset.description}") # Get dataset detailsdataset=client.registry.datasets.get('edgeandnode', 'ethereum-mainnet') print(f"Latest version: {dataset.latest_version}") # Inspect dataset schemaclient.registry.datasets.inspect('edgeandnode', 'ethereum-mainnet')Explore dataset schemas before querying:
fromamp.registryimportRegistryClientclient=RegistryClient() # Pretty-print dataset structure (interactive)client.datasets.inspect('edgeandnode', 'ethereum-mainnet') # Output:# Dataset: edgeandnode/ethereum-mainnet@latest## blocks (21 columns)# block_num UInt64 NOT NULL# timestamp Timestamp(Nanosecond) NOT NULL# hash FixedSizeBinary(32) NOT NULL# ...# Get structured schema data (programmatic)schema=client.datasets.describe('edgeandnode', 'ethereum-mainnet') # Find tables with specific columnsfortable_name, columnsinschema.items(): col_names= [col['name'] forcolincolumns] if'block_num'incol_names: print(f"Table '{table_name}' has block_num column") # Find all address columns (20-byte binary)fortable_name, columnsinschema.items(): addresses= [col['name'] forcolincolumnsifcol['type'] =='FixedSizeBinary(20)'] ifaddresses: print(f"{table_name}: {', '.join(addresses)}")Start up a marimo workspace editor
uv run marimo editThe Marimo app will open a new browser tab where you can create a new notebook, view helpful resources, and browse existing notebooks in the workspace.
You can execute python apps and scripts using uv run <path> which will give them access to the dependencies and the amp package. For example, you can run the execute_query app with the following command.
uv run apps/execute_query.py- Admin Client Guide - Complete guide for dataset management and deployment
- Registry Guide - Discover and search datasets in the Registry
- Dataset Inspection - Explore dataset schemas with
inspect()anddescribe() - Admin API Reference - Full API documentation for admin operations
- Parallel Streaming Usage Guide - User guide for high-throughput parallel data loading
- Parallel Streaming Design - Technical design documentation for parallel streaming architecture
- Reorganization Handling - Guide for handling blockchain reorganizations
- Implementing Data Loaders - Guide for creating custom data loaders
In order to operate a local Amp server you will need to have the files that dump produces available locally, and run the server You can then use it in your python scripts, apps or notebooks.
The project is set up to use the pytest testing framework. It follows standard python test discovery rules.
Run all tests
uv run pytestRun only unit tests (fast, no external dependencies)
make test-unitRun integration tests with automatic container setup
make test-integrationRun all tests with coverage
make test-allIntegration tests can run in two modes:
The integration tests will automatically spin up PostgreSQL and Redis containers using testcontainers. This is the default mode and requires Docker to be installed and running.
# Run integration tests with automatic containers uv run pytest tests/integration/ -m integrationNote: The configuration automatically disables Ryuk (testcontainers cleanup container) to avoid Docker connectivity issues. If you need Ryuk enabled, set TESTCONTAINERS_RYUK_DISABLED=false.
If you prefer to use your own database instances, you can disable testcontainers:
# Disable testcontainers and use manual configurationexport USE_TESTCONTAINERS=false # Configure your database connectionsexport POSTGRES_HOST=localhost export POSTGRES_PORT=5432 export POSTGRES_DB=test_amp export POSTGRES_USER=postgres export POSTGRES_PASSWORD=yourpassword export REDIS_HOST=localhost export REDIS_PORT=6379 export REDIS_PASSWORD=yourpassword # Optional# Run tests uv run pytest tests/integration/ -m integrationFor manual setup, you can use the provided Makefile commands:
# Start test databases manually make test-setup # Run tests make test-integration # Clean up databases make test-cleanupRun tests for specific loaders:
make test-postgresql # PostgreSQL tests make test-redis # Redis tests make test-deltalake # Delta Lake tests make test-iceberg # Iceberg tests make test-lmdb # LMDB testsRun tests for specific features:
make test-parallel-streaming # Parallel streaming integration tests (requires Amp server)Note: Parallel streaming tests require an Amp server. Configure using environment variables in .test.env:
AMP_SERVER_URL- Amp server URL (e.g.,grpc://your-server:80)AMP_TEST_TABLE- Source table name (e.g.,eth_firehose.blocks)AMP_TEST_BLOCK_COLUMN- Block column name (default:block_num)AMP_TEST_MAX_BLOCK- Max block for testing (default:1000)
Ruff is configured to be used for linting and formatting of this project.
Run formatter
uv run ruff formatRun linter
uv run ruff check .Run linter and apply auto-fixes
uv run ruff check . --fix