Custom Data Sources
Guide to creating custom data source handlers.
Overview
Graphty's data source system is extensible. Create custom data sources to load graphs from APIs, databases, or specialized file formats.
DataSource Interface
All data sources extend the abstract DataSource class:
typescript
abstract class DataSource {
static type: string;
abstract load(config: object): AsyncGenerator<GraphData>;
}
interface GraphData {
nodes: NodeData[];
edges: EdgeData[];
}Creating a Custom Data Source
Basic Example
typescript
import { DataSource, GraphData, NodeData, EdgeData } from '@graphty/graphty-element';
class MyDataSource extends DataSource {
static type = 'my-api';
async *load(config: { url: string }): AsyncGenerator<GraphData> {
const response = await fetch(config.url);
const data = await response.json();
yield {
nodes: data.nodes,
edges: data.edges
};
}
}
// Register the data source
DataSource.register(MyDataSource);Using Your Data Source
typescript
await graph.loadFromDataSource('my-api', {
url: 'https://api.example.com/graph'
});Complete Example: REST API
typescript
import { DataSource, GraphData, NodeData, EdgeData } from '@graphty/graphty-element';
interface ApiConfig {
baseUrl: string;
apiKey?: string;
graphId: string;
}
class RestApiDataSource extends DataSource {
static type = 'rest-api';
async *load(config: ApiConfig): AsyncGenerator<GraphData> {
const headers: Record<string, string> = {
'Content-Type': 'application/json'
};
if (config.apiKey) {
headers['Authorization'] = `Bearer ${config.apiKey}`;
}
// Fetch nodes
const nodesResponse = await fetch(
`${config.baseUrl}/graphs/${config.graphId}/nodes`,
{ headers }
);
const nodesData = await nodesResponse.json();
// Fetch edges
const edgesResponse = await fetch(
`${config.baseUrl}/graphs/${config.graphId}/edges`,
{ headers }
);
const edgesData = await edgesResponse.json();
// Transform to Graphty format
const nodes: NodeData[] = nodesData.map((n: any) => ({
id: n.nodeId,
label: n.name,
...n.attributes
}));
const edges: EdgeData[] = edgesData.map((e: any) => ({
source: e.fromNode,
target: e.toNode,
weight: e.weight
}));
yield { nodes, edges };
}
}
DataSource.register(RestApiDataSource);Usage:
typescript
await graph.loadFromDataSource('rest-api', {
baseUrl: 'https://api.example.com',
apiKey: 'your-api-key',
graphId: 'graph-123'
});Chunked Loading
For large datasets, yield data in chunks:
typescript
class PaginatedDataSource extends DataSource {
static type = 'paginated';
async *load(config: { url: string; pageSize: number }): AsyncGenerator<GraphData> {
let page = 0;
let hasMore = true;
while (hasMore) {
const response = await fetch(
`${config.url}?page=${page}&size=${config.pageSize}`
);
const data = await response.json();
yield {
nodes: data.nodes,
edges: data.edges
};
hasMore = data.hasNextPage;
page++;
}
}
}
DataSource.register(PaginatedDataSource);This progressively loads the graph, with the UI updating after each chunk.
Streaming Data Source
For real-time data streams:
typescript
class WebSocketDataSource extends DataSource {
static type = 'websocket';
async *load(config: { wsUrl: string }): AsyncGenerator<GraphData> {
const ws = new WebSocket(config.wsUrl);
// Create a queue for incoming data
const queue: GraphData[] = [];
let resolve: (() => void) | null = null;
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
queue.push({
nodes: data.nodes || [],
edges: data.edges || []
});
if (resolve) {
resolve();
resolve = null;
}
};
// Wait for initial connection
await new Promise<void>((r) => { ws.onopen = () => r(); });
// Yield data as it arrives
try {
while (ws.readyState === WebSocket.OPEN) {
if (queue.length > 0) {
yield queue.shift()!;
} else {
await new Promise<void>((r) => { resolve = r; });
}
}
} finally {
ws.close();
}
}
}
DataSource.register(WebSocketDataSource);Custom File Format
Parse a custom file format:
typescript
class CustomFormatDataSource extends DataSource {
static type = 'custom-format';
async *load(config: { content: string }): AsyncGenerator<GraphData> {
const lines = config.content.split('\n');
const nodes: NodeData[] = [];
const edges: EdgeData[] = [];
let section = 'none';
for (const line of lines) {
const trimmed = line.trim();
if (trimmed === '[NODES]') {
section = 'nodes';
continue;
}
if (trimmed === '[EDGES]') {
section = 'edges';
continue;
}
if (!trimmed || trimmed.startsWith('#')) {
continue;
}
if (section === 'nodes') {
const [id, label] = trimmed.split(':');
nodes.push({ id, label });
}
if (section === 'edges') {
const [source, target, weight] = trimmed.split(',');
edges.push({
source,
target,
weight: parseFloat(weight) || 1
});
}
}
yield { nodes, edges };
}
}
DataSource.register(CustomFormatDataSource);Example custom format file:
# My Graph
[NODES]
a:Node A
b:Node B
c:Node C
[EDGES]
a,b,1.5
b,c,2.0
c,a,0.5Schema Validation
Use Zod for input validation:
typescript
import { z } from 'zod';
import { DataSource, GraphData } from '@graphty/graphty-element';
const ConfigSchema = z.object({
url: z.string().url(),
timeout: z.number().optional().default(5000),
retries: z.number().optional().default(3)
});
type Config = z.infer<typeof ConfigSchema>;
class ValidatedDataSource extends DataSource {
static type = 'validated';
async *load(config: unknown): AsyncGenerator<GraphData> {
// Validate config
const validConfig = ConfigSchema.parse(config);
// Use validated config
let lastError: Error | null = null;
for (let attempt = 0; attempt < validConfig.retries; attempt++) {
try {
const controller = new AbortController();
const timeoutId = setTimeout(
() => controller.abort(),
validConfig.timeout
);
const response = await fetch(validConfig.url, {
signal: controller.signal
});
clearTimeout(timeoutId);
const data = await response.json();
yield { nodes: data.nodes, edges: data.edges };
return;
} catch (error) {
lastError = error as Error;
}
}
throw lastError || new Error('Failed to load data');
}
}
DataSource.register(ValidatedDataSource);Error Handling
Handle errors gracefully:
typescript
class RobustDataSource extends DataSource {
static type = 'robust';
async *load(config: { urls: string[] }): AsyncGenerator<GraphData> {
const allNodes: NodeData[] = [];
const allEdges: EdgeData[] = [];
const errors: Error[] = [];
for (const url of config.urls) {
try {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data = await response.json();
if (!Array.isArray(data.nodes) || !Array.isArray(data.edges)) {
throw new Error('Invalid data format');
}
allNodes.push(...data.nodes);
allEdges.push(...data.edges);
// Yield partial results
yield { nodes: data.nodes, edges: data.edges };
} catch (error) {
errors.push(error as Error);
console.warn(`Failed to load ${url}:`, error);
}
}
if (allNodes.length === 0 && errors.length > 0) {
throw new AggregateError(errors, 'All data sources failed');
}
}
}
DataSource.register(RobustDataSource);Configuration Types
Export your config type for TypeScript users:
typescript
// my-data-source.ts
export interface MyDataSourceConfig {
url: string;
apiKey?: string;
options?: {
includeMetadata?: boolean;
maxNodes?: number;
};
}
class MyDataSource extends DataSource {
static type = 'my-source';
async *load(config: MyDataSourceConfig): AsyncGenerator<GraphData> {
// ...
}
}Usage with type checking:
typescript
import type { MyDataSourceConfig } from './my-data-source';
const config: MyDataSourceConfig = {
url: 'https://api.example.com/graph',
options: { maxNodes: 1000 }
};
await graph.loadFromDataSource('my-source', config);