
从JNI获取的Direct Buffer数据直接上传至S3,可以有效避免不必要的内存拷贝,提高程序性能。通常,从JNI获取的Direct Buffer需要先拷贝到JVM堆内存中,再进行上传操作。但这种方式会增加内存占用,并引入额外的拷贝开销。为了解决这个问题,我们可以利用jclouds库提供的ByteSource接口,自定义一个ByteSource实现,直接将Direct Buffer包装成输入流,从而避免了数据拷贝。
以下是具体实现步骤:
-
创建自定义的ByteBufferByteSource类
该类继承自ByteSource,并持有一个ByteBuffer对象。openStream()方法返回一个自定义的ByteBufferInputStream,用于从ByteBuffer中读取数据。
import com.google.common.io.ByteSource; import java.io.IOException; import java.io.InputStream; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import static com.google.common.base.Preconditions.checkNotNull; public class ByteBufferByteSource extends ByteSource { private final ByteBuffer buffer; public ByteBufferByteSource(ByteBuffer buffer) { this.buffer = checkNotNull(buffer); } @Override public InputStream openStream() { return new ByteBufferInputStream(buffer); } private static final class ByteBufferInputStream extends InputStream { private final ByteBuffer buffer; private boolean closed = false; ByteBufferInputStream(ByteBuffer buffer) { this.buffer = buffer; } @Override public synchronized int read() throws IOException { if (closed) { throw new IOException("Stream already closed"); } try { return buffer.get() & 0xFF; // Important: convert byte to unsigned int } catch (BufferUnderflowException bue) { return -1; } } @Override public synchronized int read(byte[] b, int off, int len) throws IOException { if (closed) { throw new IOException("Stream already closed"); } if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } int available = buffer.remaining(); if (available == 0) { return -1; } int readLength = Math.min(len, available); buffer.get(b, off, readLength); return readLength; } @Override public void close() throws IOException { super.close(); closed = true; } } }注意事项:
-
使用自定义的ByteBufferByteSource上传数据
在上传数据时,使用ByteBufferByteSource包装Direct Buffer,并将其作为payload传递给blobBuilder。
import org.jclouds.ContextBuilder; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; import java.nio.ByteBuffer; public String uploadDirectBuffer(String container, String objectKey, ByteBuffer bb) { BlobStoreContext context = ContextBuilder.newBuilder("aws-s3") // 替换为你的S3 provider .credentials("YOUR_ACCESS_KEY", "YOUR_SECRET_KEY") // 替换为你的凭证 .buildView(BlobStoreContext.class); BlobStore blobStore = context.getBlobStore(); ByteBufferByteSource byteSource = new ByteBufferByteSource(bb); Blob blob = blobStore.blobBuilder(objectKey) .payload(byteSource) .contentLength(bb.capacity()) .build(); blobStore.putBlob(container, blob); context.close(); return objectKey; }注意事项:
- 替换代码中的S3 provider和凭证信息。
- 确保jclouds库已正确添加到项目中。
-
JNI代码示例
#include
#include #include #include #include #define SHM_NAME "/my_shared_memory" #define SHM_SIZE 200 * 1024 * 1024 // 200MB JNIEXPORT jobject JNICALL Java_service_SharedMemoryJNIService_getDirectByteBuffer(JNIEnv *env, jclass clazz, jlong buf_addr, jint buf_len) { return env->NewDirectByteBuffer((void *)buf_addr, buf_len); } JNIEXPORT jlong JNICALL Java_service_SharedMemoryJNIService_createSharedMemory(JNIEnv *env, jclass clazz) { int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666); if (shm_fd == -1) { perror("shm_open"); return -1; } if (ftruncate(shm_fd, SHM_SIZE) == -1) { perror("ftruncate"); close(shm_fd); return -1; } void *ptr = mmap(0, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (ptr == MAP_FAILED) { perror("mmap"); close(shm_fd); return -1; } close(shm_fd); // No longer needed after mmap return (jlong)ptr; } JNIEXPORT void JNICALL Java_service_SharedMemoryJNIService_writeToSharedMemory(JNIEnv *env, jclass clazz, jlong buf_addr, jint data_size) { char *ptr = (char *)buf_addr; for (int i = 0; i < data_size; ++i) { ptr[i] = 'A' + (i % 26); // Example data } } JNIEXPORT void JNICALL Java_service_SharedMemoryJNIService_unmapSharedMemory(JNIEnv *env, jclass clazz, jlong buf_addr) { void *ptr = (void *)buf_addr; if (munmap(ptr, SHM_SIZE) == -1) { perror("munmap"); } } JNIEXPORT void JNICALL Java_service_SharedMemoryJNIService_deleteSharedMemory(JNIEnv *env, jclass clazz) { if (shm_unlink(SHM_NAME) == -1) { perror("shm_unlink"); } } -
Java代码示例
package service; import java.nio.ByteBuffer; public class SharedMemoryJNIService { static { System.loadLibrary("sharedmemory"); // Load the sharedmemory library } public native ByteBuffer getDirectByteBuffer(long buf_addr, int buf_len); public native long createSharedMemory(); public native void writeToSharedMemory(long buf_addr, int data_size); public native void unmapSharedMemory(long buf_addr); public native void deleteSharedMemory(); public static void main(String[] args) { SharedMemoryJNIService service = new SharedMemoryJNIService(); long sharedMemoryAddress = service.createSharedMemory(); if (sharedMemoryAddress == -1) { System.err.println("Failed to create shared memory."); return; } int dataSize = 50 * 1024 * 1024; // 50MB service.writeToSharedMemory(sharedMemoryAddress, dataSize); ByteBuffer directBuffer = service.getDirectByteBuffer(sharedMemoryAddress, dataSize); // Example usage with the S3 uploader S3Uploader s3Uploader = new S3Uploader(); // Replace with your S3 uploader class String objectKey = "my-object"; String bucketName = "my-bucket"; try { String uploadedKey = s3Uploader.uploadDirectBuffer(bucketName, objectKey, directBuffer); System.out.println("Uploaded to S3: " + uploadedKey); } catch (Exception e) { System.err.println("Failed to upload to S3: " + e.getMessage()); e.printStackTrace(); } finally { service.unmapSharedMemory(sharedMemoryAddress); service.deleteSharedMemory(); } } }注意事项:
- 确保正确配置JNI环境和动态链接库。
- 替换S3Uploader为实际的S3上传类。
总结
通过自定义ByteSource,我们可以避免将Direct Buffer数据拷贝到JVM堆内存,从而实现高效的数据上传。这种方法不仅减少了内存占用,还降低了拷贝开销,提升了程序性能。在实际应用中,可以根据具体需求对ByteBufferInputStream进行优化,例如实现更高效的读取方式。










