Data

Introduction

This document defines the protocol used by the network to trasmit messages across the network.

Concepts

Types

Data that flows through Marlin can either be transactions or blocks of any Blockchain Protocol. The protocol can also be extended to other type of data but relevant spam detection mechanisms should be in place. In this document we focus on the propagation of blocks through Marlin.

Constants

Name Type value
NO_OF_CLUSTER_OPTIONS uint8 TBD
CHUNKS_PER_PEER uint8 TBD
CHUNK_REPORT_LIMIT bytes32 TBD
TOLERABLE_LATENCY uint TBD
STAKE_PER_BYTE uint TBD

Data Structures

Type Aliases

Name Type Description
Time int Timestamp in unix format
PeerId bytes32 Peer public Identifier
MessageId bytes32 Unique Id for the message
MessageType uint8 Type of the protocol for which message is being sent

Chunk

struct Chunk {
    byte[] bytes
    Time timestamp
    PeerId sender
    MessageId messageId
    bytes32 id
    MessageType messageType
    Witness witness
}

ChunkId

struct ChunkId {
    bytes[] partialWitness // only signature and witness from the previous hop
    bytes32 id
}

ParentPeer

struct ParentPeer {
    PeerId peerId
    Score score
}

Witness

struct Witness {
    bytes[] data
    bytes32 signature
}

Storage

chunkCache

mapping((MessageId, ChunkId.id) => (timestamp, Chunk[])) chunkCache

messageCache

mapping(MessageId => Message)

ackCache

mapping((MessageId, ChunkId) => ackCache)

activePeers

PeerId[] activePeers

NUM_CHUNKS

mapping(MESSAGE_TYPE => uint32)

MIN_CHUNKS

mapping(MESSAGE_TYPE => uint32)

Operations

Types of Data

Block data is currently in scope for the relay. Though future versions might also support transaction data as well as generic data.

Block Data is generally significantly large and hence divided ontp multiple parts using erasure coding techniques to ensure redundency and reliability of delivery.

Data Format

Data that is propagated in the network can be divided into 3 components.

  1. Chunk
  2. Witness
  3. Attestation

Message Format

| Attestation | Witness | Chunk |

Chunk

Chunk is the message that is propagated through the cluster to the receivers. Chunk is a resultant of the erausre coded block data(More details here // TODO: Insert hyperlink). A specified number of chunks are necessary for the receiver to reconstruct the message.

Witness

Witness is the proof that data is transmitted into the network by a specific cluster. Witness contains the signature of a member of the cluster and it also contains the details about the timestamp at which chunk was sent to the cluster. Witness can also used as a proof that the cluster has propagated the message and the timestamp of the message which is used for audit purposes.

Witness Format

| Message Hash | Witness Signature |
 <- 32 Bytes -> <--- 65 Bytes ---->

Message Hash - Message Hash is the hash of the entire message that is transmitted by the cluster which includes chunk data, timestamp and attestation. This is part of the witness to ensure that the signature created is specific to the message and cannot be reused for other messages.

Witness Signature - Witness Signature is a signature of the message hash by one of the members of the cluster registered in cluster marketplace.

Witness Wire Format

| Witness Signature |

Message hash need not be transmitted on wire as it can be derived from the message sent along with the witness.

Attestation

|  Message Id |  Channel Id |   Chunk Id  |  Timestamp  | ChunkLength | stakeOffset |  Chunk Hash  | Attestation Signature |
 <- 4 Bytes -> <- 2 Bytes -> <- 2 Bytes -> <- 4 Bytes -> <- 4 Bytes -> <- 4 Bytes -> <- 32 Bytes -> <------ 65 Bytes ----->

Attestation specified above consists of MessageId, channelId, ChunkId, Timestamp, ChunkLength, Chunk and Attestation Signature.

Message Id is the unique identifier provided for each message that is sent. Message id is unique because it contains the first 4 bytes of the hash of the block that is being sent as the message which will be verified at the receiver.

Channel Id is the unique id provided to each channel on which the message is being sent. Each channel corresponds to a Blockchain platform and hence message in each channel corresponds to a specific message format.

Chunk Id is the chunk id provided to each chunk created from the message. The chunk id is sequential starting from 0. Each chunk is assigned a unique sequential id.

Timestamp specifies the timestamp at which the message is being broadcasted. Cluster will wait if local timestamp is above the chunk timestamp. Timestamp is necessary to ensure that the message of the relayer is not being replayed.

ChunkLength is necessary to know the length of the data that is expected as part of the chunk. StakeOffset is the total offset of the stake that is attached to the message being sent. If the same stake is attached again to another message within a refresh interval, the producer will be slashed.

Chunk Hash is the hash of the chunk obtained by erasure coding the message that is relayed on the network. More details about how chunks are created is explained here. Chunk hash is essential to link the chunk that was sent to the network with the attestation.

Attestation Signature is the signature by the producer to attest to the details of the message. It is necessary to hold the producer responsible in case the message sent is invalid or spam.

Note: Spam prevention is necessary to ensure spam doesn't make the number of combinations to reconstruct too large.

Attestation Wire Format

| Message Id | Channel Id |   Chunk Id  | Timestamp  | Stake Offset |Attestation Signature |
 <- 4 Byte -> <- 2 Byte -> <- 2 Bytes -> <- 4 Byte -> <-- 4 Byte --> <----- 65 Bytes ------>

Acknowledgement

When a producer sends the message to a cluster so as to distribute, the cluster will send an acknowledgement back to the producer on receipt of the payment for the message. Acknowledgement is necessary to ensure that the entire block is sent to the network. If the producer will not be able to producer enough acknowledgements then the producer is liable to be slashed.

Acknowledgment format

| Message Id | Chunk Id | Cluster Id | Timestamp | Producer Id | Cluster Member Sign | <- 4 Byte -> <- 2 Byte -> <- 2 Byte -> <- 4 Byte-> <- 20 Byte -> <---- 65 Bytes ----->

Message Id and Chunk Id are the same as described above. They are necessary to ensure that the acknowledgement points to a specific message chunk.

ClusterId is used to identify the cluster to which the message is being sent. This is used when proof has to be verified if the message was actually sent to the correct cluster as defined by the protocol.

Timestamp is used to identify the timestamp at which the acknowledgement has been sent, this is to know the timestamp at which the message was processed.

Producer Id is used to identify the producer who has propagated the block to the network.

Cluster Member Signature is used to ensure that the acknowledgement was provided by the member of the cluster accepting the incoming message from the producer.

Setup

Marlin provides support for plugins that can be installed at each subscriber and publisher node for a specific protocol to preprocess the data to provide protocol specific data & perform optimizations. The plugin also includes a post processor to translate the optimized data sent through Marlin network to it's original form.

There are different pre and post processing plugins used for transactions and blocks. Once the data is propagated by the source, the cluster/relay nodes doesn't need to differentiate between the different types of data that flows through them. Transactions use plugins that validate the message according to the protocol and use compression techniques. Blocks use plugins that validate the block, convert them into serialization formats such as RLPx or a efficient custom format and compress them using techniques like Xthinner or Graphene.

Plugins can also act as a playground for testing various improvements on the data serialization and compression techniques, to test out various algorithms. Analyze these new algorithms in real world conditions. Any custom compression techniques can also be used and plugins can be made available to relevant users.

Data Entry

TODO: What happends when multiple producers send the block simultaneously ? In this case, the receiver will not drop even if the same message is being sent again.

Generate Message Id

A producer created a message to be broadcasted to the network. Producer then generates the unique id based on the first 4 Bytes of the block hash.

func bytes[] generateMessageId(bytes[] message):
    // First 4 bytes of the message hash i.e 8 hex characters
    messageId = message.hash[0:8]
    return messageId

TODO: Take a look at 2D erasure coding.

Erasure Coding of the block

Sending each of the chunk to different clusters ensures that even if few chunks are dropped by malicious clusters the message can still be reconstructed at the receiver. The message is divided into NUM_CHUNKS[message_type] chunks using Rabin's algorithm as shown below. MIN_CHUNKS[message_type] are required to reconstruct the block. All encoding and decoding operations are done in finite field mod(P).

func bytes[][] getChunksFromMessage(bytes[] message, Message_type message_type):
    chunk_length = MIN_CHUNKS[message_type]
    num_of_chunks = ceiling(message.length/chunk_length)
    padded_message = pad_left(message, chunk_length*num_of_chunks)
    for chunk_number in range(0, num_of_chunks)
        original_chunked_data.push(message[(chunk_number * chunk_length):((chunk_number + 1) * chunk_length) - 1])
    for encoded_chunk_number in range(0, NUM_CHUNKS[message.type])
        for chunk_number in range(0, num_of_chunks)
            for character_number in range(0, MIN_CHUNKS(message_type))
                encoded_chunked_character += original_chunked_data[chunk_number][character_number] * ENCODER_DATA[encoded_chunk_number][character_number]
            encoded_chunked_message.push(encoded_chunked_character)
        encoded_message.push(encoded_chunked_message)
    return encoded_message

Get cluster options for a chunk

In case of block, the message is broken into chunks using erasure coding. Each of the chunk is allocated a set of NO_OF_CLUSTER_OPTIONS clusters based on the hash of the messageId and chunk No. NO_OF_CLUSTER_OPTIONS clusters whose cluster id's are closest to the hash are considered the options for clusters through which message can be propagated in the network.

func clusterId[] getClusterOptions(bytes[] messageId, uint chunkNo):
    bytes32 hash = Keccak256(messageId, chunkNo)
    uint clusters = getClusterList()
    uint index = indexInClusters(hash, clusters)
    clusterId[] selectedClusters;
    if(index == 0) {
        selectedClusters = clusters[0..(NO_OF_CLUSTER_OPTIONS-1)]
    }
    uint l = index - 1
    uint g = index
    for(uint i = 0; i < NO_OF_CLUSTER_OPTIONS; i++) {
        if(l == -1 || (hash - clusters[l] > clusters[g] - hash)) {
            selectedClusters.push(clusters[g])
            g++
            return
        } else if(g >= clusters.length || (hash - clusters[l] < clusters[g] - hash)) {
            selectedClusters.push(clusters[l])
            l--
            return
        }
    }
    return selectedClusters
func uint indexInClusters(bytes[] hash, clusterId[] clusters):
    // TODO: Binary search to find the index at which the hash can be inserted in the clusters array

Each individual producer can select a clusterSelector plugin based on the requirements. Clusters can be selected based on various parameters such as Cost, Quality of Service, Reputation .....

func clusterId clusterSelector(clusterId[] clusters):
    // Implement a custom logic

Note: Spam prevention is also essential to ensure that messageId is not randomly generated so as to get the required cluster. Instead cluster is randomly selected.

A set of clusters are selected for each message to ensure that there is competition between clusters to improve the services or risk getting evicted as a cluster. Once the set of clusters is known, the producer can choose a cluster from the options to send the chunk to.

Send chunks to Clusters

Each of the chunks is attested by the producer. Attestor should be a valid producer to ensure spam prevention for the receivers. Attested chunks are then sent to one of the clusters from the options from here. Select one of the cluster based on some selection criterion as described here to propagate the message.

func send(bytes[] message, string messageType):
    // Divide message into multiple chunks using erasure coding
    chunks = getChunksFromMessage(message)
    // Generate a unique message Id
    messageId = generateMessageId(message)
    for chunkIndex in chunks:
        bytes32[] clusterOptions = getClusterOptions(messageId, chunkIndex)
        bytes32 selectedCluster = clusterSelector(clusterOptions)
        uint chunkOffset = getStakeOffset(chunks[chunkIndex].data.length + attestation.length)
        if chunkOffset < 0:
            return false
        bytes[] attestation = attest(messageId, messageType, chunk, chunkOffset)
        PeerId peerId = Producer::getPeer(selectedCluster)
        peer(peerId).send(chunks[chunkIndex].data + attestation)
func attest(bytes[] messageId, string messageType, Chunk chunk, uint chunkOffset):
    uint channelId = getChannelId(messageType)
    uint timestamp = new Date()
    uint chunkHash = keccak256(chunk)
    attestationFormat = messageId + channelId + chunk.id + timestamp + chunk.length + chunkOffset + chunkHash
    sig = sign(attestationFormat, this.privKey)
    attestationWire = messageId + channelId + chunk.id + chunkOffset + timestamp + sig
    return attestationWire
func getStakeOffset(uint dataLength):
    if self.currentStakeOffset + dataLength*STAKE_PER_BYTE > this.totalStakeTokens:
        return -1
    allotedStakeOffset = self.currentStakeOffset
    self.currentStakeOffset += dataLength*STAKE_PER_BYTE
    return allotedStakeOffset

The producer waits for acknowledgement from the cluster once the message is received. These acknowledgements are necessary for producer to prove that the entire message was propagated in the network.

func awaitAck(bytes[] message):
    ackCache[message.messageId][message.chunkId] = message

Data Propogation

The first member of the cluster who receives the chunk message from the producer attaches a witness to the message propagated. The receiver will accept the message only if the witness is part of the message and is valid. If the message is not already seen, then it is propagated further.

func onChunk(PeerId from, bytes[] bytes):
    uint peerType = peerType(from)
    Message message = parseData(bytes)
    if chunkCache[message.messageId].chunks[message.chunkId] || now - message.timestamp > TOLERABLE_LATENCY
        return
    if peerType == "producer":
        producerKey = verifyAttestation(message.attestation, message.chunk)
        bytes[] witness = createWitness(message)
        message.witness = witness
        signedAck = acknowledgeMessage(message, producerKey)
        peer(from).send(signedAck)
    else if peerType == "clusterPeer"
        // Custom Logic as per the cluster
        if !verifyWitness(message.witness):
            return
    chunkCache[message.messageId].chunks[message.chunkId] = message
    chunkCache[message.messageId].timestamp = message.timestamp
    //TODO: check if the payment is being regularly made, if not drop the message and don't send the ack back.
    sendToPeers(message)
func acknowledgeMessage(Message message, bytes[] producerKey):
    ackPayload = message.messageId + message.chunkId + this.clusterId + message.timestamp + producerKey
    signedAck = sign(ackPayload, this.privKey)
    return signedAck
func bool verifyWitness(Witness witness):
    bytes[] signerKey = verifySignature(witness.data, witness.signature)
    if !signerKey || !isValidClusterMember(signerKey) :
        return false
    return true
func bool verifyAttestation(Attestation attestation, Chunk chunk):
    attestationData = attestation.messageId + attestation.channelId + attestation.chunkId + attestation.timestamp + chunk.length + chunk.stakeOffset + keccak256(chunk.data)
    bytes[] signerKey = verifySignature(attestationData, attestation.signature)
    if !signerKey || !isValidProducer(signerKey) :
        return null
    return signerKey
func bool isValidClusterMember(bytes[] address):
    //TODO: Populate cluster member list on startup and triggers to update cluster list when churn
    clusterId[] clusters = getClusterList()
    if clusters.find(address):
        return true
    return false
func bool isValidProducer(bytes[] address):
    //TODO: Populate producer list on startup and triggers to update producer list when churn
    bytes32 producers = getProducers()
    if producers.find(address):
        return true
    return false

The cluster can decide upon a specific mechanism for routing the data through the network. Below is a mechanism for sending data to everyone.

func sendToPeers(Chunk chunk) {
    for(peer in active_peers) {
        sendChunk(peer, chunk)
    }
}

Data Exit

A consumer connected to Marlin network can receive the chunks and reconstruct the messages based on the messageId or sender. The chunks received are checked for attestation. Once MIN_CHUNKS[message_type] chunks are received by the consumer the message can be reconstructed.

func receive(bytes[] data):
    if !verifyAttestation(data):
        return
    message = parseMessage(data)
    chunks = chunkCache[message.messageId].chunks
    if chunks.length == 0:
        startChunkExpiryTimer(chunks[0].timestamp, message.messageId)
    if message.chunkId in chunks:
        return
    if chunks[message.chunkId].length >= MIN_CHUNKS[message.channelId]:
        message = getMessageFromChunks(chunks)
        bool validBlock = isValidBlock(message)
        if !validBlock:
            for chunk in chunks
                if(producers.findIndex(chunk.producer) >= 0)
                    continue
                producers.push(chunk.producer)
            Consumer::raiseDispute(message.messageId, producers)
        messageCache.push(message)
        messageReceived(message)
func onChunkExpiryTimer(MessageId messageId):
    chunks = chunkCache[messageId].chunks
    if chunks[message.chunkId].length < MIN_CHUNKS[message.channelId]:
        for chunk in chunks
            if(producers.findIndex(chunk.producer) >= 0)
                continue
            producers.push(chunk.producer)
        Consumer::raiseDispute(message.messageId, producers)
func messageReceived(message)
    // send notification to user
func bytes[] getMessageFromChunks(bytes[][] chunks, int[] chunk_indices):
    for chunk_indicator_index in chunk_indices:
        relevant_encoder_data.push(ENCODER_DATA[chunk_indices[chunk_indicator_index]])
    decoder_data = matrixInverse(relevant_encoder_data)
    decoded_chunks = matrixMultiplication(decoder_data, chunks)
    for chunk in decoded_chunks:
        decodedMessage = decodedMessage.append(decoded_chunks[chunk])
    return decodedMessage