-
Notifications
You must be signed in to change notification settings - Fork 1k
Expand file tree
/
Copy pathcj.c
More file actions
102 lines (101 loc) · 4.12 KB
/
cj.c
File metadata and controls
102 lines (101 loc) · 4.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#include "data.table.h"
/*
OpenMP is used here to parallelize:
- The element assignment in vectors
- The memory copying operations (blockwise replication of data using memcpy)
- The creation of all combinations of the input vectors over the cross-product space
*/
SEXP cj(SEXP base_list)
{
int ncol = LENGTH(base_list);
SEXP out = PROTECT(allocVector(VECSXP, ncol));
int nrow = 1;
// already confirmed to be less than .Machine$integer.max at R level
for (int j = 0; j < ncol; j++) nrow *= length(VECTOR_ELT(base_list, j));
int eachrep = 1;
for (int j = ncol - 1; j >= 0; j--) {
SEXP source = VECTOR_ELT(base_list, j), target;
SET_VECTOR_ELT(out, j, target = allocVector(TYPEOF(source), nrow));
copyMostAttrib(source, target); // includes levels of factors, integer64, custom classes, etc
if (nrow == 0) continue; // one or more columns are empty so the result will be empty, #2511
int thislen = LENGTH(source);
int blocklen = thislen * eachrep;
int ncopy = nrow / blocklen;
switch(TYPEOF(source)) {
case LGLSXP:
case INTSXP: {
const int *restrict sourceP = INTEGER_RO(source);
int *restrict targetP = INTEGER(target);
#pragma omp parallel for num_threads(getDTthreads(thislen*eachrep, true))
// default static schedule so two threads won't write to same cache line in last column
// if they did write to same cache line (and will when last column's thislen is small) there's no correctness issue
for (int i = 0; i < thislen; i++) {
const int item = sourceP[i];
const int end = (i + 1) * eachrep;
for (int j = i * eachrep; j < end; j++) targetP[j] = item; // no div, mod or read ops inside loop; just rep a const contiguous write
}
#pragma omp parallel for num_threads(getDTthreads(ncopy*blocklen, true))
for (int i = 1; i < ncopy; i++) {
memcpy(targetP + i * blocklen, targetP, blocklen * sizeof(*targetP));
}
} break;
case REALSXP: {
const double *restrict sourceP = REAL_RO(source);
double *restrict targetP = REAL(target);
#pragma omp parallel for num_threads(getDTthreads(thislen*eachrep, true))
for (int i = 0; i < thislen; i++) {
const double item = sourceP[i];
const int end = (i + 1) * eachrep;
for (int j = i * eachrep; j < end; j++) targetP[j] = item;
}
#pragma omp parallel for num_threads(getDTthreads(ncopy*blocklen, true))
for (int i = 1; i < ncopy; i++) {
memcpy(targetP + i * blocklen, targetP, blocklen * sizeof(double));
}
} break;
case CPLXSXP: {
const Rcomplex *restrict sourceP = COMPLEX_RO(source);
Rcomplex *restrict targetP = COMPLEX(target);
#pragma omp parallel for num_threads(getDTthreads(thislen*eachrep, true))
for (int i = 0; i < thislen; i++) {
const Rcomplex item = sourceP[i];
const int end = (i + 1) * eachrep;
for (int j = i * eachrep; j < end; j++) targetP[j] = item;
}
#pragma omp parallel for num_threads(getDTthreads(ncopy*blocklen, true))
for (int i = 1; i < ncopy; i++) {
memcpy(targetP + i * blocklen, targetP, blocklen * sizeof(Rcomplex));
}
} break;
case STRSXP: {
const SEXP *sourceP = STRING_PTR_RO(source);
int start = 0;
for (int i = 0; i < ncopy; i++) {
for (int j = 0; j < thislen; j++) {
const SEXP item = sourceP[j];
const int end = start + eachrep;
for (int k = start; k < end; k++) SET_STRING_ELT(target, k, item); // no div, mod, or read-API call to STRING_ELT
start = end;
}
}
} break;
case VECSXP: {
const SEXP *sourceP = SEXPPTR_RO(source);
int start = 0;
for (int i = 0; i < ncopy; i++) {
for (int j = 0; j < thislen; j++) {
const SEXP item = sourceP[j];
const int end = start + eachrep;
for (int k = start; k < end; k++) SET_VECTOR_ELT(target, k, item);
start = end;
}
}
} break;
default:
error(_("Type '%s' is not supported by CJ."), type2char(TYPEOF(source)));
}
eachrep *= thislen;
}
UNPROTECT(1);
return out;
}