续体的序列化

下面是一个使用 Continuation API 进行序列化的 Java 应用程序。此应用程序通过将其状态保存到文件来持久化。每次运行时,程序都会增加计数器,显示更新后的值,然后退出。

package com.oracle.truffle.espresso.test.continuations;

import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.graalvm.continuations.Continuation;
import org.graalvm.continuations.ContinuationEntryPoint;
import org.graalvm.continuations.SuspendCapability;

/**
 * Application that persists its state by saving it to a file.
 * <p>
 * Each time the program is run, it increments the counter and prints out the new value before quitting.
 * <p>
 * By default, the state is persisted to a file named "state.serial.bin" in the current working directory,
 * but it can be changed by specifying a new path with the {@code "-p <path>"} option.
 * <p>
 * By default, standard Java serialization is used, but "Kryo" can be selected with the {@code "-s kryo"} option.
 * <p>
 * The continuation payload must implement `ContinuationEntryPoint`.
 * This class is also `Serializable` to work with Java serialization.
 */
public class PersistentApp implements ContinuationEntryPoint, Serializable {
    /**
     * An interface for serializing/deserializing a continuation to the file system.
     * Two implementations are showcased later: one for `Java` and one for `Kryo`.
     */
    public interface MySerializer {
        Continuation load(Path storagePath) throws IOException, ClassNotFoundException;

        void saveTo(Continuation continuation, Path storagePath) throws IOException;
    }

    private static final String DEFAULT_PATH = "state.serial.bin";

    int counter = 0;

    /**
     * Anything reachable from the stack in this method is persisted, including 'this'.
     * <p>
     * Suspending a continuation requires access to this “suspend capability” object.
     * By controlling who gets access to it, you can work out where a suspension might occur.
     * If you do not want this, the capability can be stored it in a static `ThreadLocal` and let anything suspend.
     */
    @Override
    public void start(SuspendCapability suspendCapability) {
        while (true) {
            counter++;
            System.out.println("The counter value is now " + counter);

            doWork(suspendCapability);
        }
    }

    private static void doWork(SuspendCapability suspendCapability) {
        // Do something ...
        /*
         * The call to `suspend` causes control flow to return from the call to resume below.
         * The state of the application will be written to the file system and it will carry on when the user starts the application again.
         */
        suspendCapability.suspend();
        // Do something else ...
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException {
        checkSupported();

        Path storagePath = getStoragePath(args);
        MySerializer ser = getSerializer(args);

        Continuation continuation = loadOrInit(storagePath, ser);
        /*
         * Control flow will either begin at `start` for the first program execution,
         * or jump to after the call to `suspend` above for later executions.
         */
        continuation.resume();
        ser.saveTo(continuation, storagePath);
    }

    private static void checkSupported() {
        try {
            if (!Continuation.isSupported()) {
                System.err.println("Ensure you are running on an Espresso VM with the flags '--experimental-options --java.Continuum=true'.");
                System.exit(1);
            }
        } catch (NoClassDefFoundError e) {
            System.err.println("Please make sure you are using a VM that supports the Continuation API");
            System.exit(1);
        }
    }

    /////////////////////////////////////////////////////////////
    // Code to load, save and resume the state of the program. //
    /////////////////////////////////////////////////////////////

    private static Path getStoragePath(String[] args) {
        for (int i = 0; i < args.length; i++) {
            String s = args[i];
            if (s.equals("-p") && (args.length > i + 1)) {
                return Paths.get(args[i + 1]);
            }
        }

        return Paths.get(DEFAULT_PATH);
    }

    private static Continuation loadOrInit(Path storagePath, MySerializer ser) throws IOException, ClassNotFoundException {
        Continuation continuation;
        if (!Files.exists(storagePath)) {
            /*
             * First execution of the program with the specified path: use a fresh continuation.
             */
            continuation = Continuation.create(new PersistentApp());
        } else {
            /*
             * Program had been executed at least once with the specified path: restore the continuation from file.
             */
            continuation = ser.load(storagePath);
        }
        return continuation;
    }

    private static MySerializer getSerializer(String[] args) {
        for (int i = 0; i < args.length; i++) {
            String s = args[i];
            if (s.equals("-s") && (args.length > i + 1)) {
                String key = args[i + 1];
                if (key.equals("java")) {
                    return new MyJavaSerializer();
                }
                if (key.equals("kryo")) {
                    return new MyKryoSerializer();
                }
            }
        }
        return new MyJavaSerializer();
    }
}

请注意 MySerializer 接口。我们将实现该接口,以展示两种不同的序列化续体的方式

  • 一种使用标准 Java 序列化(使用内置的 ObjectInputStreamObjectOutputStream);
  • 另一种使用流行且快速的 Kryo 库。

Java #

以下是使用标准 Java 序列化的 MySerializer 实现

:warning: Java 对象序列化要求从捕获的栈可访问的所有内容都实现 Serializable 接口,并且(理想情况下)通过序列化过滤器进行白名单。这不如 Kryo 方便,Kryo 可以轻松序列化任何内容。其格式也比 Kryo 的更冗长,产生的续体大小约为两倍。但是,它避免了对单独依赖的需求。

import static java.nio.file.StandardOpenOption.*;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;

import org.graalvm.continuations.Continuation;

class MyJavaSerializer implements PersistentApp.MySerializer {
    @Override
    public Continuation load(Path storagePath) throws IOException, ClassNotFoundException {
        try (var in = new ObjectInputStream(Files.newInputStream(storagePath, READ))) {
            return (Continuation) in.readObject();
        }
    }

    @Override
    public void saveTo(Continuation continuation, Path storagePath) throws IOException {
        // Will overwrite previously existing file if any.
        try (var out = new ObjectOutputStream(Files.newOutputStream(storagePath, CREATE, TRUNCATE_EXISTING, WRITE))) {
            out.writeObject(continuation);
        }
    }
}

Kryo #

以下是使用 Kryo 库的 MySerializer 实现

import static java.nio.file.StandardOpenOption.*;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

import org.graalvm.continuations.Continuation;
import org.graalvm.continuations.ContinuationSerializable;
import org.objenesis.strategy.StdInstantiatorStrategy;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.KryoObjectInput;
import com.esotericsoftware.kryo.io.KryoObjectOutput;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;

class MyKryoSerializer implements PersistentApp.MySerializer {
    // We set up the Kryo engine here.
    private static final Kryo kryo = setupKryo();

    private static Kryo setupKryo() {
        var kryo = new Kryo();
        // The heap will have cycles, and Kryo requires us to opt in to support for that.
        kryo.setReferences(true);
        // We do not want to manually register everything, as heap contents are dynamic.
        kryo.setRegistrationRequired(false);
        // Be able to create objects even if they lack a no-arg constructor.
        kryo.setInstantiatorStrategy(
                new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
        /*
         * Register a custom serializer for continuation objects.
         * All serialization-relevant classes in the Continuation API will extend the `ContinuationSerializable` class.
         */
        kryo.addDefaultSerializer(ContinuationSerializable.class, new ContinuationSerializer());
        return kryo;
    }
    
    /**
     * A custom Kryo `Serializer` for continuation objects.
     */
    static class ContinuationSerializer extends Serializer<ContinuationSerializable> {
        public ContinuationSerializer() {
            super(false, false);
        }

        @Override
        public void write(Kryo kryo, Output output, ContinuationSerializable object) {
            try {
                ContinuationSerializable.writeObjectExternal(object, new KryoObjectOutput(kryo, output));
            } catch (IOException e) {
                throw new KryoException(e);
            }
        }

        @Override
        public ContinuationSerializable read(Kryo kryo, Input input, Class<? extends ContinuationSerializable> type) {
            try {
                /*
                 * The continuation deserialization mechanism will use this classloader to load the classes present on the heap.
                 * Kryo requires awareness of created objects in order to handle cycles in the serialized object graph.
                 * Let Kryo know about the deserialized objects using kryo::reference.
                 */
                return ContinuationSerializable.readObjectExternal(type, new KryoObjectInput(kryo, input),
                        kryo.getClassLoader(),
                        kryo::reference);
            } catch (IOException | ClassNotFoundException e) {
                throw new KryoException(e);
            }
        }
    }

    @Override
    public Continuation load(Path storagePath) throws IOException {
        try (var in = new Input(Files.newInputStream(storagePath, READ))) {
            return kryo.readObject(in, Continuation.class);
        }
    }

    @Override
    public void saveTo(Continuation continuation, Path storagePath) throws IOException {
        try (var out = new Output(Files.newOutputStream(storagePath, CREATE, TRUNCATE_EXISTING, WRITE))) {
            kryo.writeObject(out, continuation);
        }
    }
}

延伸阅读 #

联系我们