1- import itertools
21import logging
32from typing import Iterable , Optional
43
5- import dnaio
64import duckdb
7- import pyarrow
5+ import oxbow
86
97from countess import VERSION
108from countess .core .parameters import BaseParam , BooleanParam , FloatParam
@@ -32,45 +30,31 @@ class LoadFastqPlugin(DuckdbLoadFileWithTheLotPlugin):
3230 def load_file (
3331 self , cursor : duckdb .DuckDBPyConnection , filename : str , file_param : BaseParam , row_limit : Optional [int ] = None
3432 ) -> duckdb .DuckDBPyRelation :
35- # Open the file, convert it to a RecordBatchReader and then
36- # wrap that up as a DuckDBPyRelation so we can filter it.
3733 logger .debug ("Loading file %s row_limit %s" , filename , row_limit )
3834
39- # Take up to row_limit records from this file
40- fastq_iter = itertools .islice (dnaio .open (filename , open_threads = 1 ), row_limit )
35+ fields = ['sequence' ]
36+ if self .min_avg_quality :
37+ fields .append ('quality' )
38+ if self .header_column :
39+ fields .append ('name' )
40+
41+ rel = oxbow .from_fastq (filename , fields = fields ).to_duckdb (cursor )
4142
42- def _record_to_dict (record ):
43- d = {"sequence" : record .sequence }
44- if self .header_column :
45- d ["header" ] = record .name
46- return d
43+ if row_limit :
44+ rel = rel .limit (row_limit )
4745
48- def _avg_quality (record ):
49- return sum (ord (c ) for c in record .qualities ) / len (record .qualities ) - 33
46+ if self .min_avg_quality :
47+ filt = "list_avg(list_transform(split(quality,''), lambda x: ord(x))) >= %d" % (self .min_avg_quality + 33 )
48+ rel = rel .filter (filt )
5049
51- pyarrow_schema = pyarrow .schema ([pyarrow .field ("sequence" , pyarrow .string ())])
5250 if self .header_column :
53- pyarrow_schema .append (pyarrow .field ("header" , pyarrow .string ()))
54-
55- # Generator which batches records 5000 at a time into RecordBatches
56- record_batch_iter = (
57- pyarrow .RecordBatch .from_pylist (
58- [
59- _record_to_dict (record )
60- for record in batch
61- if self .min_avg_quality <= 0 or self .min_avg_quality <= _avg_quality (record )
62- ]
63- )
64- for batch in itertools .batched (fastq_iter , 5000 )
65- )
66-
67- # We can turn that generator of RecordBatches into a temporary table
68- rel = cursor .from_arrow (pyarrow .RecordBatchReader .from_batches (pyarrow_schema , record_batch_iter ))
51+ rel = rel .project ("sequence, name as header" )
52+ else :
53+ rel = rel .project ("sequence" )
6954
7055 if self .group :
7156 rel = rel .aggregate ("sequence, count(*) as count" )
7257
73- logger .debug ("Loading file %s row_limit %s done" , filename , row_limit )
7458 return rel
7559
7660 def combine (
@@ -96,14 +80,5 @@ class LoadFastaPlugin(DuckdbLoadFileWithTheLotPlugin):
9680 def load_file (
9781 self , cursor : duckdb .DuckDBPyConnection , filename : str , file_param : BaseParam , row_limit : Optional [int ] = None
9882 ) -> duckdb .DuckDBPyRelation :
99- pyarrow_schema = pyarrow .schema (
100- [pyarrow .field ("sequence" , pyarrow .string ()), pyarrow .field ("header" , pyarrow .string ())]
101- )
102-
103- fasta_iter = itertools .islice (dnaio .open (filename , open_threads = 1 ), row_limit )
104- record_batch_iter = (
105- pyarrow .RecordBatch .from_pylist ([{"sequence" : z .sequence , "header" : z .name } for z in y ])
106- for y in itertools .batched (fasta_iter , 5000 )
107- )
108- rel = cursor .from_arrow (pyarrow .RecordBatchReader .from_batches (pyarrow_schema , record_batch_iter ))
109- return rel
83+ rel = oxbow .from_fasta (filename ).to_duckdb (cursor )
84+ return rel .limit (row_limit ) if row_limit else rel
0 commit comments