A client came to us with a familiar problem: their CSV import was timing out. They run a retail operation with 500,000+ SKUs across multiple locations. Every time brands send them inventory manifests — 2,000 to 10,000 rows — the import would crash halfway through.
Their previous system took 20+ minutes to process large files (when it worked at all). After we rebuilt it with queue-driven architecture, the same imports complete in under 3 minutes. Here's exactly how.
Why Your CSV Import Crashes
Most CSV import problems come from the same root causes:
- Memory exhaustion: Loading 10,000 rows into memory at once kills PHP's memory limit
- HTTP timeout: Web requests timeout after 30-60 seconds — large imports take longer
- Database bottleneck: 10,000 individual INSERT statements take forever
- No recovery: When it fails at row 5,000, you have no idea what imported and what didn't
- User confusion: "Is it still running? Did it finish? Should I upload again?"
The synchronous approach — upload file, process everything, return response — doesn't scale. You need to decouple the upload from the processing.
The Architecture: Queue-Driven Processing
Here's the high-level flow:
User uploads CSV
│
▼
Controller validates & stores file
│
▼
Dispatch ProcessCsvImport job (returns immediately)
│
▼
User sees "Import started" + progress indicator
│
▼
Queue worker picks up job
│
▼
Job reads file in chunks (1000 rows each)
│
▼
Each chunk → dispatches batch of ProcessRow jobs
│
▼
Rows processed in parallel by multiple workers
│
▼
Import record updated with progress/completion
Key insight: The HTTP request returns in milliseconds. All the heavy lifting happens in the background. Users can navigate away, come back, and check progress.
The Implementation
Step 1: Track Import Status
First, create a model to track import progress:
// database/migrations/create_csv_imports_table.php
Schema::create('csv_imports', function (Blueprint $table) {
$table->id();
$table->string('filename');
$table->string('status')->default('pending'); // pending, processing, completed, failed
$table->integer('total_rows')->default(0);
$table->integer('processed_rows')->default(0);
$table->integer('failed_rows')->default(0);
$table->json('errors')->nullable();
$table->timestamp('started_at')->nullable();
$table->timestamp('completed_at')->nullable();
$table->timestamps();
});
Step 2: Handle the Upload
The controller validates, stores the file, and dispatches the job immediately:
// app/Http/Controllers/CsvImportController.php
public function store(Request $request)
{
$request->validate([
'file' => 'required|file|mimes:csv,txt|max:51200' // 50MB max
]);
// Store file
$path = $request->file('file')->store('imports');
// Count rows (fast, doesn't load into memory)
$rowCount = $this->countCsvRows(Storage::path($path));
// Create import record
$import = CsvImport::create([
'filename' => $path,
'total_rows' => $rowCount,
'status' => 'pending'
]);
// Dispatch job and return immediately
ProcessCsvImport::dispatch($import);
return response()->json([
'import_id' => $import->id,
'total_rows' => $rowCount,
'message' => 'Import started. Check status at /api/imports/' . $import->id
]);
}
private function countCsvRows(string $path): int
{
$count = 0;
$handle = fopen($path, 'r');
while (fgets($handle) !== false) {
$count++;
}
fclose($handle);
return max(0, $count - 1); // Subtract header row
}
Step 3: The Main Import Job
This job reads the CSV in chunks and dispatches batch jobs for each chunk:
// app/Jobs/ProcessCsvImport.php
class ProcessCsvImport implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(public CsvImport $import) {}
public function handle()
{
$this->import->update([
'status' => 'processing',
'started_at' => now()
]);
$path = Storage::path($this->import->filename);
$chunkSize = 1000;
$chunk = [];
$rowNumber = 0;
$header = null;
$handle = fopen($path, 'r');
while (($row = fgetcsv($handle)) !== false) {
// First row is header
if ($header === null) {
$header = $row;
continue;
}
$rowNumber++;
$chunk[] = [
'row_number' => $rowNumber,
'data' => array_combine($header, $row)
];
// Dispatch chunk when full
if (count($chunk) >= $chunkSize) {
ProcessCsvChunk::dispatch($this->import, $chunk);
$chunk = [];
}
}
// Dispatch remaining rows
if (!empty($chunk)) {
ProcessCsvChunk::dispatch($this->import, $chunk);
}
fclose($handle);
}
}
Step 4: Process Chunks with Batch Insert
Each chunk job validates and bulk-inserts rows:
// app/Jobs/ProcessCsvChunk.php
class ProcessCsvChunk implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels, Batchable;
public function __construct(
public CsvImport $import,
public array $rows
) {}
public function handle()
{
$validRows = [];
$errors = [];
foreach ($this->rows as $row) {
try {
// Validate and transform
$validated = $this->validateRow($row['data']);
$validRows[] = $validated;
} catch (\Exception $e) {
$errors[] = [
'row' => $row['row_number'],
'error' => $e->getMessage()
];
}
}
// Bulk insert valid rows (much faster than individual inserts)
if (!empty($validRows)) {
Product::upsert(
$validRows,
['sku'], // Unique key for upsert
['name', 'price', 'quantity', 'updated_at'] // Columns to update
);
}
// Update progress atomically
$this->import->increment('processed_rows', count($validRows));
$this->import->increment('failed_rows', count($errors));
if (!empty($errors)) {
$existingErrors = $this->import->errors ?? [];
$this->import->update([
'errors' => array_merge($existingErrors, $errors)
]);
}
// Check if import is complete
$this->checkCompletion();
}
private function checkCompletion()
{
$this->import->refresh();
$totalProcessed = $this->import->processed_rows + $this->import->failed_rows;
if ($totalProcessed >= $this->import->total_rows) {
$this->import->update([
'status' => 'completed',
'completed_at' => now()
]);
}
}
}
The Performance Difference
Here's what this architecture achieves:
| Metric | Before (Sync) | After (Queue) |
|---|---|---|
| 10K row import | 20+ minutes (often timeout) | <3 minutes |
| HTTP response time | 20+ minutes (blocking) | <500ms |
| Memory usage | Crashes at 2K rows | Stable (chunks) |
| Failure recovery | Start over | Resume from failure |
| User experience | "Is it working?" | Real-time progress |
Production Tips
1. Use Redis for Queues
Don't use the database queue driver for high-volume imports. Redis is faster and won't lock your database during heavy processing:
// .env
QUEUE_CONNECTION=redis
// config/queue.php - increase retry attempts for imports
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'retry_after' => 300, // 5 minutes for long-running chunks
'block_for' => null,
],
2. Use Upsert, Not Insert
If rows might already exist (re-importing, updates), use upsert() instead of insert(). It's a single query that handles both insert and update:
// Instead of checking exists + insert/update
Product::upsert(
$validRows, // Data
['sku'], // Unique key
['name', 'price', 'quantity', 'updated_at'] // Columns to update if exists
);
3. Add a Status Endpoint
Let users poll for progress:
// GET /api/imports/{id}
public function show(CsvImport $import)
{
return response()->json([
'status' => $import->status,
'progress' => [
'total' => $import->total_rows,
'processed' => $import->processed_rows,
'failed' => $import->failed_rows,
'percentage' => round(
($import->processed_rows + $import->failed_rows)
/ max(1, $import->total_rows) * 100
)
],
'errors' => $import->errors,
'started_at' => $import->started_at,
'completed_at' => $import->completed_at,
]);
}
4. Handle Worker Failures
Jobs can fail. Make them idempotent and retriable:
class ProcessCsvChunk implements ShouldQueue
{
public $tries = 3; // Retry up to 3 times
public $backoff = [30, 60]; // Wait 30s, then 60s between retries
public function failed(\Throwable $exception)
{
// Log failure, notify admin, update import status
$this->import->update(['status' => 'failed']);
Log::error('CSV chunk failed', [
'import_id' => $this->import->id,
'error' => $exception->getMessage()
]);
}
}
When to Use This Pattern
Queue-driven imports make sense when:
- Files exceed 1,000 rows — chunking prevents memory issues
- Processing takes >30 seconds — async prevents HTTP timeouts
- Imports happen frequently — users need progress visibility
- Data validation is complex — error tracking becomes essential
- External APIs are involved — rate limiting and retries are easier in jobs
For small imports (<500 rows, simple validation), synchronous processing is fine. Don't over-engineer.
The Results
For our retail client processing 500,000+ SKUs across multiple locations:
Their warehouse team now imports brand manifests confidently. No more "did it work?" No more re-uploading files. No more crashed systems before big events.
Need Help?
If your CSV imports are crashing, timing out, or taking forever — we've solved this problem multiple times.
Book a 30-minute call and we'll look at your specific situation. No pitch — just honest advice on whether queue-driven architecture makes sense for your use case.