Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions src/java/org/apache/cassandra/cql3/ResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,27 @@ public class ResultSet
public static final Codec codec = new Codec();

public final ResultMetadata metadata;
public final List<List<ByteBuffer>> rows;
public final List<List<byte[]>> rows;

public ResultSet(ResultMetadata resultMetadata)
{
this(resultMetadata, new ArrayList<List<ByteBuffer>>());
this(resultMetadata, new ArrayList<List<byte[]>>());
}

public ResultSet(ResultMetadata resultMetadata, List<List<ByteBuffer>> rows)
public ResultSet(ResultMetadata resultMetadata, List<List<byte[]>> rows)
{
this.metadata = resultMetadata;
this.rows = rows;
}

public static ResultSet fromByteBufferRows(ResultMetadata resultMetadata, List<List<ByteBuffer>> bbRows)
{
List<List<byte[]>> converted = new ArrayList<>(bbRows.size());
for (List<ByteBuffer> bbRow : bbRows)
converted.add(convertByteBufferList(bbRow));
return new ResultSet(resultMetadata, converted);
}

public int size()
{
return rows.size();
Expand All @@ -69,21 +77,40 @@ public boolean isEmpty()
return size() == 0;
}

public void addRow(List<ByteBuffer> row)
public void addRow(List<byte[]> row)
{
assert row.size() == metadata.valueCount();
rows.add(row);
}

public void addColumnValue(ByteBuffer value)
public void addByteBufferRow(List<ByteBuffer> row)
{
assert row.size() == metadata.valueCount();
rows.add(convertByteBufferList(row));
}

private static List<byte[]> convertByteBufferList(List<ByteBuffer> row)
{
List<byte[]> converted = new ArrayList<>(row.size());
for (ByteBuffer bb : row)
converted.add(ByteBufferUtil.getArrayUnsafeNullable(bb));
return converted;
}

public void addColumnValue(byte[] value)
{
if (rows.isEmpty() || lastRow().size() == metadata.valueCount())
rows.add(new ArrayList<ByteBuffer>(metadata.valueCount()));
rows.add(new ArrayList<byte[]>(metadata.valueCount()));

lastRow().add(value);
}

private List<ByteBuffer> lastRow()
public void addColumnValue(ByteBuffer value)
{
addColumnValue(ByteBufferUtil.getArrayUnsafeNullable(value));
}

private List<byte[]> lastRow()
{
return rows.get(rows.size() - 1);
}
Expand All @@ -110,11 +137,11 @@ public String toString()
{
StringBuilder sb = new StringBuilder();
sb.append(metadata).append('\n');
for (List<ByteBuffer> row : rows)
for (List<byte[]> row : rows)
{
for (int i = 0; i < row.size(); i++)
{
ByteBuffer v = row.get(i);
byte[] v = row.get(i);
if (v == null)
{
sb.append(" | null");
Expand All @@ -123,9 +150,9 @@ public String toString()
{
sb.append(" | ");
if (metadata.flags.contains(Flag.NO_METADATA))
sb.append("0x").append(ByteBufferUtil.bytesToHex(v));
sb.append("0x").append(ByteBufferUtil.bytesToHex(ByteBuffer.wrap(v)));
else
sb.append(metadata.names.get(i).type.getString(v));
sb.append(metadata.names.get(i).type.getString(ByteBuffer.wrap(v)));
}
}
sb.append('\n');
Expand All @@ -151,12 +178,12 @@ public ResultSet decode(ByteBuf body, ProtocolVersion version)
{
ResultMetadata m = ResultMetadata.codec.decode(body, version);
int rowCount = body.readInt();
ResultSet rs = new ResultSet(m, new ArrayList<List<ByteBuffer>>(rowCount));
ResultSet rs = new ResultSet(m, new ArrayList<List<byte[]>>(rowCount));

// rows
int totalValues = rowCount * m.columnCount;
for (int i = 0; i < totalValues; i++)
rs.addColumnValue(CBUtil.readValue(body));
rs.addColumnValue(CBUtil.readValueAsBytes(body));

return rs;
}
Expand All @@ -165,7 +192,7 @@ public void encode(ResultSet rs, ByteBuf dest, ProtocolVersion version)
{
ResultMetadata.codec.encode(rs.metadata, dest, version);
dest.writeInt(rs.rows.size());
for (List<ByteBuffer> row : rs.rows)
for (List<byte[]> row : rs.rows)
{
// Note that we do only want to serialize only the first columnCount values, even if the row
// as more: see comment on ResultMetadata.names field.
Expand All @@ -177,7 +204,7 @@ public void encode(ResultSet rs, ByteBuf dest, ProtocolVersion version)
public int encodedSize(ResultSet rs, ProtocolVersion version)
{
int size = ResultMetadata.codec.encodedSize(rs.metadata, version) + 4;
for (List<ByteBuffer> row : rs.rows)
for (List<byte[]> row : rs.rows)
{
for (int i = 0; i < rs.metadata.columnCount; i++)
size += CBUtil.sizeOfValue(row.get(i));
Expand Down
26 changes: 21 additions & 5 deletions src/java/org/apache/cassandra/cql3/UntypedResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public Iterator<Row> iterator()
{
return new AbstractIterator<Row>()
{
final Iterator<List<ByteBuffer>> iter = cqlRows.rows.iterator();
final Iterator<List<byte[]>> iter = cqlRows.rows.iterator();

protected Row computeNext()
{
Expand Down Expand Up @@ -176,7 +176,7 @@ public Iterator<Row> iterator()
{
return new AbstractIterator<Row>()
{
private Iterator<List<ByteBuffer>> currentPage;
private Iterator<List<byte[]>> currentPage;

protected Row computeNext()
{
Expand Down Expand Up @@ -242,7 +242,7 @@ public Iterator<Row> iterator()
{
return new AbstractIterator<Row>()
{
private Iterator<List<ByteBuffer>> currentPage;
private Iterator<List<byte[]>> currentPage;

protected Row computeNext()
{
Expand Down Expand Up @@ -275,11 +275,27 @@ public static class Row
@Nonnull
private final List<ColumnSpecification> columns;

public Row(@Nonnull List<ColumnSpecification> names, @Nonnull List<ByteBuffer> columns)
public Row(@Nonnull List<ColumnSpecification> names, @Nonnull List<byte[]> columns)
{
this.columns = ImmutableList.copyOf(names);
for (int i = 0; i < names.size(); i++)
data.put(names.get(i).name.toString(), columns.get(i));
{
byte[] v = columns.get(i);
data.put(names.get(i).name.toString(), v == null ? null : ByteBuffer.wrap(v));
}
}

public static Row fromByteBuffers(@Nonnull List<ColumnSpecification> names, @Nonnull List<ByteBuffer> columns)
{
Row row = new Row(names);
for (int i = 0; i < names.size(); i++)
row.data.put(names.get(i).name.toString(), columns.get(i));
return row;
}

private Row(@Nonnull List<ColumnSpecification> names)
{
this.columns = ImmutableList.copyOf(names);
}

public boolean has(String column)
Expand Down
15 changes: 10 additions & 5 deletions src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ public ResultSetBuilder(ResultMetadata metadata, Selectors selectors, boolean un
this.unmask = unmask;
}

private void addSize(List<ByteBuffer> row)
private void addSize(List<byte[]> row)
{
for (int i=0, isize=row.size(); i<isize; i++)
{
ByteBuffer value = row.get(i);
size += value != null ? value.remaining() : 0;
byte[] value = row.get(i);
size += value != null ? value.length : 0;
}
}

Expand Down Expand Up @@ -107,6 +107,11 @@ public void add(ByteBuffer v)
inputRow.add(v);
}

public void add(byte[] v)
{
inputRow.add(v);
}

public void add(Cell<?> c, long nowInSec)
{
inputRow.add(c, nowInSec);
Expand Down Expand Up @@ -170,9 +175,9 @@ public ResultSet build()
return resultSet;
}

private List<ByteBuffer> getOutputRow()
private List<byte[]> getOutputRow()
{
List<ByteBuffer> row = selectors.getOutputRow();
List<byte[]> row = selectors.getOutputRow();
addSize(row);
return row;
}
Expand Down
37 changes: 19 additions & 18 deletions src/java/org/apache/cassandra/cql3/selection/Selection.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.cassandra.cql3.selection;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -39,11 +38,13 @@
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.cql3.selection.Selector.InputRow;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.ByteArrayAccessor;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JsonUtils;

import static org.apache.cassandra.utils.LocalizeString.toLowerCaseLocalized;
Expand Down Expand Up @@ -318,22 +319,22 @@ public String toString()
.toString();
}

private static List<ByteBuffer> rowToJson(List<ByteBuffer> row,
ProtocolVersion protocolVersion,
ResultSet.ResultMetadata metadata,
List<ColumnMetadata> orderingColumns)
private static List<byte[]> rowToJson(List<byte[]> row,
ProtocolVersion protocolVersion,
ResultSet.ResultMetadata metadata,
List<ColumnMetadata> orderingColumns)
{
ByteBuffer[] jsonRow = new ByteBuffer[orderingColumns.size() + 1];
byte[][] jsonRow = new byte[orderingColumns.size() + 1][];
StringBuilder sb = new StringBuilder("{");
for (int i = 0; i < metadata.names.size(); i++)
{
ColumnSpecification spec = metadata.names.get(i);
ByteBuffer buffer = row.get(i);
byte[] value = row.get(i);

// If it is an ordering column we need to keep it in case we need it for post ordering
int index = orderingColumns.indexOf(spec);
if (index >= 0)
jsonRow[index + 1] = buffer;
jsonRow[index + 1] = value;

// If the column is only used for ordering we can stop here.
if (i >= metadata.getColumnCount())
Expand All @@ -349,14 +350,14 @@ private static List<ByteBuffer> rowToJson(List<ByteBuffer> row,
sb.append('"');
sb.append(JsonUtils.quoteAsJsonString(columnName));
sb.append("\": ");
if (buffer == null)
if (value == null)
sb.append("null");
else
sb.append(spec.type.toJSONString(buffer, protocolVersion));
sb.append(spec.type.toJSONString(value, ByteArrayAccessor.instance, protocolVersion));
}
sb.append("}");

jsonRow[0] = UTF8Type.instance.getSerializer().serialize(sb.toString());
jsonRow[0] = ByteBufferUtil.getArrayUnsafeNullable(UTF8Type.instance.getSerializer().serialize(sb.toString()));
return Arrays.asList(jsonRow);
}

Expand Down Expand Up @@ -400,14 +401,14 @@ public interface Selectors
*/
void addInputRow(InputRow input);

List<ByteBuffer> getOutputRow();
List<byte[]> getOutputRow();

void reset();
}

public static class SimpleSelectors implements Selectors
{
protected List<ByteBuffer> current;
protected List<byte[]> current;

@Override
public void addInputRow(InputRow input)
Expand All @@ -416,7 +417,7 @@ public void addInputRow(InputRow input)
}

@Override
public List<ByteBuffer> getOutputRow()
public List<byte[]> getOutputRow()
{
return current;
}
Expand Down Expand Up @@ -500,7 +501,7 @@ public Selectors newSelectors(QueryOptions options)
return new SimpleSelectors()
{
@Override
public List<ByteBuffer> getOutputRow()
public List<byte[]> getOutputRow()
{
if (isJson)
return rowToJson(current, options.getProtocolVersion(), metadata, orderingColumns);
Expand Down Expand Up @@ -591,12 +592,12 @@ public boolean hasProcessing()
return true;
}

public List<ByteBuffer> getOutputRow()
public List<byte[]> getOutputRow()
{
List<ByteBuffer> outputRow = new ArrayList<>(selectors.size());
List<byte[]> outputRow = new ArrayList<>(selectors.size());

for (Selector selector: selectors)
outputRow.add(selector.getOutput(options.getProtocolVersion()));
outputRow.add(selector.getOutputAsBytes(options.getProtocolVersion()));

return isJson ? rowToJson(outputRow, options.getProtocolVersion(), metadata, orderingColumns) : outputRow;
}
Expand Down
Loading