jhi*_*man 6 c arrays sorting performance
我有一个2D数组,其中每行包含6个已按升序排序的整数.例:
1 2 3 4 5 6
6 8 9 10 13 15
1 4 5 6 7 9
1 4 5 6 7 8
3 18 19 20 25 34
Run Code Online (Sandbox Code Playgroud)
预期产量:
1 2 3 4 5 6
1 4 5 6 7 8
1 4 5 6 7 9
3 18 19 20 25 34
6 8 9 10 13 15
Run Code Online (Sandbox Code Playgroud)
实际数据包含8米到33米这样的记录.我正在尝试确定排序此数组的最快方法.我目前有一些使用qsort的工作代码:
qsort电话:
qsort(allRecords, lineCount, sizeof(int*), cmpfunc);
Run Code Online (Sandbox Code Playgroud)
cmpfunc:
int cmpfunc (const void * a, const void * b)
{
const int *rowA = *(const int **)a;
const int *rowB = *(const int **)b;
if (rowA[0] > rowB[0]) return 1;
if (rowA[0] < rowB[0]) return -1;
if (rowA[1] > rowB[1]) return 1;
if (rowA[1] < rowB[1]) return -1;
if (rowA[2] > rowB[2]) return 1;
if (rowA[2] < rowB[2]) return -1;
if (rowA[3] > rowB[3]) return 1;
if (rowA[3] < rowB[3]) return -1;
if (rowA[4] > rowB[4]) return 1;
if (rowA[4] < rowB[4]) return -1;
if (rowA[5] > rowB[5]) return 1;
if (rowA[5] < rowB[5]) return -1;
return 0;
}
Run Code Online (Sandbox Code Playgroud)
对于样本3300万条记录,它需要大约35.6秒(gcc -O1),这是相当快的,但我想知道是否有任何更快的方法来执行它给定每行中的预排序值.
这自然适用于最常见的第一个数字为1的数据,因此在33m记录文件中,可能有12m记录以1开头,然后8m记录以2开头,5m记录以3开头等等...我是不确定这是否适合某种特定类型的排序而不是另一种(例如heapsort).
我的理解是qsort有相当大的开销,因为它必须调用该函数,所以我希望有一些更快的性能.
我通常不会编写C代码,因此我对建议和批评非常开放,因为我正在将这些内容与教程和其他StackOverflow问题/答案拼凑在一起.
编辑:根据要求,我的初始化代码:
// Empty record
int recArray[6] = {0,0,0,0,0,0};
// Initialize allRecords
int** allRecords;
allRecords = (int**) malloc(lineCount*sizeof(int*));
for(i=0; i < lineCount; i++)
{
allRecords[i] = (int*) malloc(6*sizeof(int));
}
// Zero-out all records
for(i=0; i < lineCount; i++)
{
memcpy(allRecords[i], recArray, 6 * sizeof(int));
}
Run Code Online (Sandbox Code Playgroud)
我还在学习正确的做事方式,所以如果我做错了,我也不会感到惊讶.正确的做法的指导将不胜感激.
其他人询问了价值范围 - 我不确定未来范围是否会发生变化,但目前这个数值介于1到99之间.
另外,对于分析 - 我构建了一个小函数,它使用gettimeofday()来拉秒/微秒,然后比较之前和之后.我对更好的方式持开放态度.输出如下:
// <-- Here I capture the gettimeofday() structure output
Sorting...
Sorted.
Time Taken: 35.628882s // <-- Capture it again, show the difference
Run Code Online (Sandbox Code Playgroud)
编辑:Per @doynax - 我现在将每行的6个值"打包"到unsigned long long int中:
// Initialize allRecords
unsigned long long int* allRecords;
allRecords = (unsigned long long int*) malloc(lineCount*sizeof(unsigned long long int));
for(i=0; i < lineCount; i++)
{
allRecords[i] = 0;
}
...
// "Pack" current value (n0) into an unsigned long long int
if(recPos == 0) { lineSum += n0 * UINT64_C(1); }
else if(recPos == 1) { lineSum += n0 * UINT64_C(100); }
else if(recPos == 2) { lineSum += n0 * UINT64_C(10000); }
else if(recPos == 3) { lineSum += n0 * UINT64_C(1000000); }
else if(recPos == 4) { lineSum += n0 * UINT64_C(100000000); }
else if(recPos == 5) { lineSum += n0 * UINT64_C(10000000000); }
...
allRecords[linecount] = lineSum;
lineSum = 0;
Run Code Online (Sandbox Code Playgroud)
我也可以稍后将这些无符号long long int值中的一个"解包"成原始的6个整数.
但是,当我尝试排序时:
qsort(allRecords, lineCount, sizeof(unsigned long long int), cmpfunc);
...
int cmpfunc (const void * a, const void * b)
{
if (*(unsigned long long int*)a > *(unsigned long long int*)b) return 1;
if (*(unsigned long long int*)a < *(unsigned long long int*)b) return -1;
return 0;
}
Run Code Online (Sandbox Code Playgroud)
...结果未按预期排序.如果我使用以下方法显示排序前后的第一行和最后一行:
printf("[%i] = %llu = %i,%i,%i,%i,%i,%i\n", j, lineSum, recArray[0]...recArray[5]);
Run Code Online (Sandbox Code Playgroud)
输出是:
First and last 5 rows before sorting:
[#] = PACKED INT64 = UNPACKED
[0] = 462220191706 = 6,17,19,20,22,46
[1] = 494140341005 = 5,10,34,40,41,49
[2] = 575337201905 = 5,19,20,37,53,57
[3] = 504236262316 = 16,23,26,36,42,50
[4] = 534730201912 = 12,19,20,30,47,53
[46] = 595648302516 = 16,25,30,48,56,59
[47] = 453635251108 = 8,11,25,35,36,45
[48] = 403221161202 = 2,12,16,21,32,40
[49] = 443736310604 = 4,6,31,36,37,44
[50] = 575248312821 = 21,28,31,48,52,57
First and last 5 rows after sorting:
[0] = 403221161202 = 2,12,16,21,32,40
[1] = 413218141002 = 2,10,14,18,32,41
[2] = 443736310604 = 4,6,31,36,37,44
[3] = 444127211604 = 4,16,21,27,41,44
[4] = 453028070302 = 2,3,7,28,30,45
[46] = 585043260907 = 7,9,26,43,50,58
[47] = 593524170902 = 2,9,17,24,35,59
[48] = 595248392711 = 11,27,39,48,52,59
[49] = 595251272612 = 12,26,27,51,52,59
[50] = 595648302516 = 16,25,30,48,56,59
Run Code Online (Sandbox Code Playgroud)
我猜我是在某种程度上比较错误的值(例如指针值而不是实际值),但我不太确定正确的语法是什么.
从好的方面来说,这种方式很快.:)
对33m 64位整数进行排序大约需要4-5秒(至少在其当前的错误形式中).
我无法抗拒一个好的优化挑战。
初步想法
我看到这个问题的第一个想法是使用基数排序,特别是带有内部计数排序的基数排序。(很高兴看到一些评论也同意我的看法!)我在其他项目中使用基数排序和内部计数排序,虽然快速排序可以在小数据集上胜过它,但它让快速排序在大数据集上落伍数据集。
我选择基数排序是因为像快速排序这样的比较排序有一个众所周知的 O( n lg n )性能限制。因为n在这里非常大,所以“lg n ”部分也非常大——对于 3300 万条记录,你可以合理地预期它需要“一些恒定时间”乘以 33000000 乘以 lg(33000000),而 lg(33000000) 大约是25. 像基数排序和计数排序这样的排序是“非比较”排序,所以他们可以通过对数据的特殊知识来削减 O( n lg n ) 边界——在这种情况下,他们可以走得更快,因为我们知道数据是小整数。
我使用了与其他人一样的将int值混合在一起的代码,将每组值组合成一个BigInt.
我的实现(见下文)执行一些恒定时间设置计算,然后它对数据精确迭代 7 次以对其进行排序——“7 次”不仅在算法上击败了“25 次”,而且我的实现很小心避免在不必要的情况下更频繁地接触内存,以便它也能像 CPU 缓存一样正常播放。
对于合理的源数据,我编写了一个小程序,它生成了 3300 万条看起来有点像原始数据的记录。每行都按排序顺序排列,并略微偏向于较小的值。(您可以在sixsample.c下面找到我的示例数据集程序。)
性能比较
使用与您的qsort()实现非常接近的东西,我通过三轮运行获得了这些数字,并且它们与您获得的数字相差不远(显然,不同的 CPU):
qsort:
Got 33000000 lines. Sorting.
Sorted lines in 5 seconds and 288636 microseconds.
Sorted lines in 5 seconds and 415553 microseconds.
Sorted lines in 5 seconds and 242454 microseconds.
Run Code Online (Sandbox Code Playgroud)
使用我的 radix-sort-with-counting-sort 实现,我在三轮运行中得到了这些数字:
Radix sort with internal counting sort, and cache-optimized:
Got 33000000 lines. Sorting.
Sorted lines in 0 seconds and 749285 microseconds.
Sorted lines in 0 seconds and 833474 microseconds.
Sorted lines in 0 seconds and 761943 microseconds.
Run Code Online (Sandbox Code Playgroud)
5.318 秒与 0.781 秒对相同的 3300 万条记录进行排序。快了 6.8 倍!
可以肯定的是,我diff编辑了每次运行的输出以保证结果相同——你真的可以在一秒钟内对数千万条记录进行排序!
深潜
基数排序和计数排序的基本原理在维基百科和我最喜欢的一本书《算法导论》中都有很好的描述,所以我不会在这里重复基础知识。相反,我将描述一些关于我的实现与教科书形式的不同之处,以便更快地运行。
这些算法的正常实现是这样的(在 Python 风格的伪代码中):
def radix-sort(records):
temp = []
for column = 0 to columns:
counting-sort(records, temp, column)
copy 'temp' back into 'records'
def counting-sort(records, dest, column):
# Count up how many of each value there is at this column.
counts = []
foreach record in records:
counts[record[column]] += 1
transform 'counts' into offsets
copy from 'records' to 'dest' using the calculated offsets
Run Code Online (Sandbox Code Playgroud)
这当然有效,但它带来了一些丑陋。“对值进行计数”步骤涉及对整个数据集中每一列的值进行计数。“将临时数据复制回记录”步骤涉及为每一列复制整个数据集。这些是“按排序顺序从记录复制到目标”所需步骤的补充。我们为每一列旋转数据 3 次;少这样做会很好!
我的实现将这些混合在一起:不是单独计算每列的计数,而是将它们作为第一遍数据一起计算。然后将计数批量转换为偏移量;最后,数据被排序到位。此外,我temp没有records在每个阶段从到复制,而是简单地翻页:每隔一次迭代,数据就会交替“来自”的地方。我的伪代码更像是这样的:
def radix-sort(records):
# Count up how many of each value there is in every column,
# reading each record only once.
counts = [,]
foreach record in records:
foreach column in columns:
counts[column, record[column]] += 1
transform 'counts' for each column into offsets for each column
temp = []
for column = 0 to columns step 2:
sort-by-counts(records, temp, column)
sort-by-counts(temp, records, column+1)
def sort-by-counts(records, dest, counts, column):
copy from 'records' to 'dest' using the calculated offsets
Run Code Online (Sandbox Code Playgroud)
这避免了不必要的数据旋转:一次设置,六次排序,所有这些都是有序的。你不能让 CPU 缓存更快乐。
C 代码
这是我对sixsort.c. 它还包括您的qsort解决方案,已注释掉,因此您可以更轻松地比较两者。它包含大量文档,以及所有 I/O 和指标代码,所以它有点长,但它的完整性应该可以帮助您更好地理解解决方案:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <sys/time.h>
/* Configuration. */
#define NUM_COLUMNS 6 /* How many columns to read/sort */
#define MAX_VALUE 100 /* Maximum value allowed in each column, plus one */
#define BITSHIFT 7 /* How far to shift bits when combining values */
/* Note: BITSHIFT should be as small as possible, but MAX_VALUE must be <= 2**BITSHIFT. */
/* Highest value representable under the given BITSHIFT. */
#define BITMASK ((1 << BITSHIFT) - 1)
/* The type we're going to pack the integers into. The highest bits will get the
leftmost number, while the lowest bits will get the rightmost number. */
typedef unsigned long long BigInt;
/*-------------------------------------------------------------------------------------------------
** Radix Sorting.
*/
/* Move a set of items from src to dest, using the provided counts to decide where each
item belongs. src and dest must not be the same arrays. */
static void SortByCounts(BigInt *src, BigInt *dest, size_t *counts, size_t totalCount, int bitshift)
{
BigInt *temp, *end;
for (temp = src, end = src + totalCount; temp < end; temp++) {
BigInt value = *temp;
int number = (int)((value >> bitshift) & BITMASK);
int offset = counts[number]++;
dest[offset] = value;
}
}
/* Use a radix sort with an internal counting sort to sort the given array of values.
This iterates over the source data exactly 7 times, which for large
'count' (i.e., count > ~2000) is typically much more efficient than O(n lg n)
algorithms like quicksort or mergesort or heapsort. (That said, the recursive O(n lg n)
algorithms typically have better memory locality, so which solution wins overall
may vary depending on your CPU and memory system.) */
static void RadixSortWithInternalCountingSort(BigInt *values, size_t count)
{
size_t i, j;
BigInt *temp, *end;
size_t counts[NUM_COLUMNS][MAX_VALUE];
size_t oldCount;
/* Reset the counts of each value in each column to zero, quickly.
This takes MAX_VALUE * NUM_COLUMNS time (i.e., constant time). */
memset(counts, 0, sizeof(counts));
/* Count up how many there are of each value in this column. This iterates over
the whole dataset exactly once, processing each set of values in full, so it
takes COUNT * NUM_COLUMNS operations, or theta(n). */
for (temp = values, end = values + count; temp < end; temp++) {
BigInt value = *temp;
for (i = 0; i < NUM_COLUMNS; i++) {
counts[i][(int)((value >> (i * BITSHIFT)) & BITMASK)]++;
}
}
/* Transform the counts into offsets. This only transforms the counts array,
so it takes MAX_VALUE * NUM_COLUMNS operations (i.e., constant time). */
size_t totals[NUM_COLUMNS];
for (i = 0; i < NUM_COLUMNS; i++) {
totals[i] = 0;
}
for (i = 0; i < MAX_VALUE; i++) {
for (j = 0; j < NUM_COLUMNS; j++) {
oldCount = counts[j][i];
counts[j][i] = totals[j];
totals[j] += oldCount;
}
}
temp = malloc(sizeof(BigInt) * count);
/* Now perform the actual sorting, using the counts to tell us how to move
the items. Each call below iterates over the whole dataset exactly once,
so this takes COUNT * NUM_COLUMNS operations, or theta(n). */
for (i = 0; i < NUM_COLUMNS; i += 2) {
SortByCounts(values, temp, counts[i ], count, i * BITSHIFT);
SortByCounts(temp, values, counts[i+1], count, (i+1) * BITSHIFT);
}
free(temp);
}
/*-------------------------------------------------------------------------------------------------
** Built-in Quicksorting.
*/
static int BigIntCompare(const void *a, const void *b)
{
BigInt av = *(BigInt *)a;
BigInt bv = *(BigInt *)b;
if (av < bv) return -1;
if (av > bv) return +1;
return 0;
}
static void BuiltInQuicksort(BigInt *values, size_t count)
{
qsort(values, count, sizeof(BigInt), BigIntCompare);
}
/*-------------------------------------------------------------------------------------------------
** File reading.
*/
/* Read a single integer from the given string, skipping initial whitespace, and
store it in 'returnValue'. Returns a pointer to the end of the integer text, or
NULL if no value can be read. */
static char *ReadInt(char *src, int *returnValue)
{
char ch;
int value;
/* Skip whitespace. */
while ((ch = *src) <= 32 && ch != '\r' && ch != '\n') src++;
/* Do we have a valid digit? */
if ((ch = *src++) < '0' || ch > '9')
return NULL;
/* Collect digits into a number. */
value = 0;
do {
value *= 10;
value += ch - '0';
} while ((ch = *src++) >= '0' && ch <= '9');
src--;
/* Return what we did. */
*returnValue = value;
return src;
}
/* Read a single line of values from the input into 'line', and return the number of
values that were read on this line. */
static int ReadLine(FILE *fp, BigInt *line)
{
int numValues;
char buffer[1024];
char *src;
int value;
BigInt result = 0;
if (fgets(buffer, 1024, fp) == NULL) return 0;
buffer[1023] = '\0';
numValues = 0;
src = buffer;
while ((src = ReadInt(src, &value)) != NULL) {
result |= ((BigInt)value << ((NUM_COLUMNS - ++numValues) * BITSHIFT));
}
*line = result;
return numValues;
}
/* Read from file 'fp', which should consist of a sequence of lines with
six decimal integers on each, and write the parsed, packed integer values
to a newly-allocated 'values' array. Returns the number of lines read. */
static size_t ReadInputFile(FILE *fp, BigInt **values)
{
BigInt line;
BigInt *dest;
size_t count, max;
count = 0;
dest = malloc(sizeof(BigInt) * (max = 256));
while (ReadLine(fp, &line)) {
if (count >= max) {
size_t newmax = max * 2;
BigInt *temp = malloc(sizeof(BigInt) * newmax);
memcpy(temp, dest, sizeof(BigInt) * count);
free(dest);
max = newmax;
dest = temp;
}
dest[count++] = line;
}
*values = dest;
return count;
}
/*-------------------------------------------------------------------------------------------------
** File writing.
*/
/* Given a number from 0 to 999 (inclusive), write its string representation to 'dest'
as fast as possible. Returns 'dest' incremented by the number of digits written. */
static char *WriteNumber(char *dest, unsigned int number)
{
if (number >= 100) {
dest += 3;
dest[-1] = '0' + number % 10, number /= 10;
dest[-2] = '0' + number % 10, number /= 10;
dest[-3] = '0' + number % 10;
}
else if (number >= 10) {
dest += 2;
dest[-1] = '0' + number % 10, number /= 10;
dest[-2] = '0' + number % 10;
}
else {
dest += 1;
dest[-1] = '0' + number;
}
return dest;
}
/* Write a single "value" (one line of content) to the output file. */
static void WriteOutputLine(FILE *fp, BigInt value)
{
char buffer[1024];
char *dest = buffer;
int i;
for (i = 0; i < NUM_COLUMNS; i++) {
if (i > 0)
*dest++ = ' ';
int number = (value >> (BITSHIFT * (NUM_COLUMNS - i - 1))) & BITMASK;
dest = WriteNumber(dest, (unsigned int)number);
}
*dest++ = '\n';
*dest = '\0';
fwrite(buffer, 1, dest - buffer, fp);
}
/* Write the entire output file as a sequence of values in columns. */
static void WriteOutputFile(FILE *fp, BigInt *values, size_t count)
{
while (count-- > 0) {
WriteOutputLine(fp, *values++);
}
}
/*-------------------------------------------------------------------------------------------------
** Timeval support.
*/
int timeval_subtract(struct timeval *result, struct timeval *x, struct timeval *y)
{
/* Perform the carry for the later subtraction by updating y. */
if (x->tv_usec < y->tv_usec) {
int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
y->tv_usec -= 1000000 * nsec;
y->tv_sec += nsec;
}
if (x->tv_usec - y->tv_usec > 1000000) {
int nsec = (y->tv_usec - x->tv_usec) / 1000000;
y->tv_usec += 1000000 * nsec;
y->tv_sec -= nsec;
}
/* Compute the time remaining to wait. tv_usec is certainly positive. */
result->tv_sec = x->tv_sec - y->tv_sec;
result->tv_usec = x->tv_usec - y->tv_usec;
/* Return 1 if result is negative. */
return x->tv_sec < y->tv_sec;
}
/*-------------------------------------------------------------------------------------------------
** Main.
*/
int main(int argc, char **argv)
{
BigInt *values;
size_t count;
FILE *fp;
struct timeval startTime, endTime, deltaTime;
if (argc != 3) {
fprintf(stderr, "Usage: sixsort input.txt output.txt\n");
return -1;
}
printf("Reading %s...\n", argv[1]);
if ((fp = fopen(argv[1], "r")) == NULL) {
fprintf(stderr, "Unable to open \"%s\" for reading.\n", argv[1]);
return -1;
}
count = ReadInputFile(fp, &values);
fclose(fp);
printf("Got %d lines. Sorting.\n", (int)count);
gettimeofday(&startTime, NULL);
RadixSortWithInternalCountingSort(values, count);
/*BuiltInQuicksort(values, count);*/
gettimeofday(&endTime, NULL);
timeval_subtract(&deltaTime, &endTime, &startTime);
printf("Sorted lines in %d seconds and %d microseconds.\n",
(int)deltaTime.tv_sec, (int)deltaTime.tv_usec);
printf("Writing %d lines to %s.\n", (int)count, argv[2]);
if ((fp = fopen(argv[2], "w")) == NULL) {
fprintf(stderr, "Unable to open \"%s\" for writing.\n", argv[2]);
return -1;
}
WriteOutputFile(fp, values, count);
fclose(fp);
free(values);
return 0;
}
Run Code Online (Sandbox Code Playgroud)
另外,作为参考,这是我的蹩脚小样本数据集生产者sixsample.c,使用可预测的 LCG 随机数生成器,因此它始终生成相同的样本数据:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <math.h>
#define NUM_COLUMNS 6
#define MAX_VALUE 35
unsigned int RandSeed = 0;
/* Generate a random number from 0 to 65535, inclusive, using a simple (fast) LCG.
The numbers below are the same as used in the ISO/IEC 9899 proposal for C99/C11. */
static int Rand()
{
RandSeed = RandSeed * 1103515245 + 12345;
return (int)(RandSeed >> 16);
}
static int CompareInts(const void *a, const void *b)
{
return *(int *)a - *(int *)b;
}
static void PrintSortedRandomLine()
{
int values[NUM_COLUMNS];
int i;
for (i = 0; i < NUM_COLUMNS; i++) {
values[i] = 1 + (int)(pow((double)Rand() / 65536.0, 2.0) * MAX_VALUE);
}
qsort(values, NUM_COLUMNS, sizeof(int), CompareInts);
/* Lame. */
printf("%d %d %d %d %d %d\n",
values[0], values[1], values[2], values[3], values[4], values[5]);
}
int main(int argc, char **argv)
{
unsigned int numLines;
unsigned int i;
if (argc < 2 || argc > 3) {
fprintf(stderr, "Usage: sixsample numLines [randSeed]\n");
return -1;
}
numLines = atoi(argv[1]);
if (argc > 2)
RandSeed = atoi(argv[2]);
for (i = 0; i < numLines; i++) {
PrintSortedRandomLine();
}
return 0;
}
Run Code Online (Sandbox Code Playgroud)
最后,这里有一个Makefile可以像我一样在 Ubuntu 16 上使用 gcc 5.4.0 构建它们的方法:
all: sixsort sixsample
sixsort: sixsort.c
gcc -O6 -Wall -o sixsort sixsort.c
sixsample: sixsample.c
gcc -O -Wall -o sixsample sixsample.c
clean:
rm -f sixsort sixsample
Run Code Online (Sandbox Code Playgroud)
进一步可能的优化
我考虑通过用纯汇编编写它们来进一步优化其中的某些部分,但是 gcc 在内联函数、展开循环方面做得非常出色,并且通常我会考虑在汇编中自己进行大多数优化,所以很难以这种方式改进它。核心操作不太适合 SIMD 指令,因此 gcc 的版本可能会尽可能快。
如果我将它编译为 64 位进程,这可能会运行得更快,因为 CPU 能够将每个完整的记录保存在一个寄存器中,但我的测试 Ubuntu VM 是 32 位的,所以 c'est la vie。
鉴于这些值本身都相当小,您可以通过组合代码点来进一步改进我的解决方案:因此,您可以将相同的代码分割为六个计数排序,而不是对每个七位列进行一个子排序其他方式的 42 位:您可以对 14 位列中的每一个执行三个子排序。这将需要在内存中存储 3*2^14 (=3*16384=49152) 个计数和偏移量而不是 6*2^7 (=6*128=768),但由于内存相当便宜,数据仍然会适合 CPU 缓存,可能值得一试。由于原始数据是两位十进制整数,您甚至可以将其从中间切开,并对两个 6 位十进制数字列中的每一列执行一对子排序(需要 2*10^6=2000000 计数和偏移量) ,这仍然可能适合您的 CPU 缓存,具体取决于您的 CPU)。
你也可以并行化它!我的实现很难点亮一个 CPU,但让其余 CPU 闲置。因此,另一种使速度更快的技术可能是将源数据分成块,每个 CPU 内核一个,独立地对每个块进行排序,然后在最后执行类似合并排序的“合并”操作以将结果组合在一起。如果您发现自己经常运行此计算,则可能需要研究并行化工作。
结论和其他想法
虽然我自己没有测试它,但比较手写快速排序的性能可能是公平的,因为qsort()在比较值时包括函数调用开销。我怀疑一个手写的解决方案会超过 5.3 秒qsort(),但它可能不会与基数排序竞争,这既是由于快速排序的非线性增长及其不太有利的 CPU 缓存特性(它会更频繁地读取每个值平均 7 次,因此从 RAM 中提取数据可能会花费更多)。
此外,现在排序的成本与将文本文件读入内存并再次将结果写回的 I/O 成本相比是相形见绌的。即使使用我的自定义解析代码,读取 3300 万条记录仍然需要几秒钟的时间,对它们进行排序只需几分之一秒,然后再将它们写回几秒钟。如果您的数据已经在 RAM 中,那可能无关紧要,但如果它位于某个磁盘上,您也需要开始寻找优化它的方法。
但是,由于问题的目标是优化排序本身,因此我认为仅在四分之三秒内对 3300 万条记录进行排序并不算太简陋。
乔纳森·莱弗勒(Jonathan Leffler)关于重新排序包装的评论是正确的,在查看您的代码时我也有同样的想法。以下是我的方法:
#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <string.h> // for memcpy
#define ROW_LENGTH 6
#define ROW_COUNT 15
// 1, 2, 3, 4, 5, 6, 6, 8, 9, 10, 13, 15, 1, 4, 5, 6, 7, 9, 1, 4, 5, 6, 7, 8, 3, 18, 19, 20, 25, 34
/*
1 2 3 4 5 6
6 8 9 10 13 15
1 4 5 6 7 9
1 4 5 6 7 8
3 18 19 20 25 34
*/
// Insertion sorting taken from /sf/answers/195267131/ with modification
static __inline__ int sortUlliArray(unsigned long long int *d, int length){
int i, j;
for (i = 1; i < length; i++) {
unsigned long long int tmp = d[i];
for (j = i; j >= 1 && tmp < d[j-1]; j--)
d[j] = d[j-1];
d[j] = tmp;
}
return i; // just to shutup compiler
}
int cmpfunc (const void * a, const void * b)
{
if (*(unsigned long long int*)a > *(unsigned long long int*)b) return 1;
if (*(unsigned long long int*)a < *(unsigned long long int*)b) return -1;
return 0;
}
int main(){
int array[ROW_COUNT][ROW_LENGTH],
decodedResultsArray[ROW_COUNT][ROW_LENGTH];
const int rawData[] = { 1, 2, 3, 4, 5, 6,
6, 8, 9, 10, 13, 15,
1, 4, 5, 6, 7, 9,
1, 4, 5, 6, 7, 8,
3, 18, 19, 20, 25, 34,
6,17,19,20,22,46,
5,10,34,40,41,49,
5,19,20,37,53,57,
16,23,26,36,42,50,
12,19,20,30,47,53,
16,25,30,48,56,59,
8,11,25,35,36,45,
2,12,16,21,32,40,
4,6,31,36,37,44,
21,28,31,48,52,57
};
memcpy(array, rawData, sizeof(rawData)/sizeof(*rawData)); // copy elements into array memory
// Sort
// precompute keys
unsigned long long int *rowSums = calloc(ROW_COUNT, sizeof(unsigned long long int));
unsigned long long int *sortedSums = rowSums ? calloc(ROW_COUNT, sizeof(unsigned long long int)) : NULL; // if rowSums is null, don't bother trying to allocate.
if(!rowSums || !sortedSums){
free(rowSums);
free(sortedSums);
fprintf(stderr, "Failed to allocate memory!\n");
fflush(stderr); // should be unnecessary, but better to make sure it gets printed
exit(100);
}
int i=0, j=0, k=0;
for(; i < ROW_COUNT; i++){
rowSums[i] = 0; // this should be handled by calloc, but adding this for debug
for(j=0; j < ROW_LENGTH; j++){
unsigned long long int iScalar=1;
for(k=ROW_LENGTH-1; k > j; --k)
iScalar *= 100; // probably not the most efficient way to compute this, but this is meant more as an example/proof of concept
unsigned long long int iHere = array[i][j];
rowSums[i] += (iHere * iScalar);
// printf("DEBUG ITERATION REPORT\n\t\tRow #%d\n\t\tColumn #%d\n\t\tiScalar: %llu\n\t\tiHere: %llu\n\t\tCurrent Sum for Row: %llu\n\n", i, j, iScalar, iHere, rowSums[i]);
fflush(stdout);
}
}
memcpy(sortedSums, rowSums, sizeof(unsigned long long int)*ROW_COUNT);
// Some debugging output:
/*
printf("Uncopied Sums:\n");
for(i=0; i < ROW_COUNT; i++)
printf("SortedRowSums[%d] = %llu\n", i, rowSums[i]);
printf("Memcopyed sort array:\n");
for(i=0; i < ROW_COUNT; i++)
printf("SortedRowSums[%d] = %llu\n", i, sortedSums[i]);
*/
clock_t begin = clock();
//qsort(sortedSums, ROW_COUNT, sizeof(unsigned long long int), cmpfunc);
sortUlliArray(sortedSums, ROW_COUNT);
clock_t end = clock();
double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
printf("Time for sort: %lf\n", time_spent);
printf("Before sort array:\n");
for(i=0; i<ROW_COUNT; i++){
for(j=0; j < ROW_LENGTH; j++){
printf("Unsorted[%d][%d] = %d\n", i, j, array[i][j]);
}
}
printf("Values of sorted computed keys:\n");
for(i=0; i < ROW_COUNT; i++)
printf("SortedRowSums[%d] = %llu\n", i, sortedSums[i]);
// Unpack:
for(i=0; i < ROW_COUNT; i++){
for(j=0; j < ROW_LENGTH; j++){
unsigned long long int iScalar=1;
for(k=ROW_LENGTH-1; k > j; --k)
iScalar *= 100;
unsigned long long int removalAmount = sortedSums[i]/iScalar;
decodedResultsArray[i][j] = removalAmount;
sortedSums[i] -= (removalAmount*iScalar);
// DEBUG:
// printf("Decoded Result for decoded[%d][%d] = %d\n", i, j, decodedResultsArray[i][j]);
}
}
printf("\nFinal Output:\n");
for(i=0; i < ROW_COUNT; i++){
printf("Row #%d: %d", i, decodedResultsArray[i][0]);
for(j=1; j < ROW_LENGTH; j++){
printf(", %d", decodedResultsArray[i][j]);
}
puts("");
}
fflush(stdout);
free(rowSums);
free(sortedSums);
return 1;
}
Run Code Online (Sandbox Code Playgroud)
请注意,这并不是为了最大效率而全部优化的,并且它散布着调试输出语句,但尽管如此,它还是打包如何工作的概念证明。另外,考虑到您必须处理的行数,您可能会更好地使用qsort(),但我使用它(这是此 StackOverflow 答案sortUlliArray(...)中的插入排序函数的修改版本)。您必须对其进行测试,看看哪种方法最适合您的情况。
总而言之,在 15 个硬编码行上运行此代码的最终输出是:
Row #0: 1, 2, 3, 4, 5, 6
Row #1: 1, 4, 5, 6, 7, 8
Row #2: 1, 4, 5, 6, 7, 9
Row #3: 2, 12, 16, 21, 32, 40
Row #4: 3, 18, 19, 20, 25, 34
Row #5: 4, 6, 31, 36, 37, 44
Row #6: 5, 10, 34, 40, 41, 49
Row #7: 5, 19, 20, 37, 53, 57
Row #8: 6, 8, 9, 10, 13, 15
Row #9: 6, 17, 19, 20, 22, 46
Row #10: 8, 11, 25, 35, 36, 45
Row #11: 12, 19, 20, 30, 47, 53
Row #12: 16, 23, 26, 36, 42, 50
Row #13: 16, 25, 30, 48, 56, 59
Row #14: 21, 28, 31, 48, 52, 57
Run Code Online (Sandbox Code Playgroud)
因此,这似乎确实可以处理数字非常相似的情况,这是由于数字打包顺序造成的问题。
无论如何,上面的代码应该可以工作,但它只是作为示例,所以我将留给您应用必要的优化。
代码在配备 1.6 GHz Intel Core i5 和 4 GB 1600 MHz DDR3 的 MacBook Air 64 位上进行了测试。因此,CPU 相当弱,内存也很慢,但它能够在 0.004 毫秒内对 15 行进行排序,在我看来,相当快。(这只是上述测试用例的排序函数速度的衡量标准,而不是预打包或解包速度的衡量标准,因为这些可以使用一些优化。)
主要功劳归功于 Doynax 和 Jonathan Leffler。
| 归档时间: |
|
| 查看次数: |
248 次 |
| 最近记录: |