Consumer

Introduction

This document defines the protocol used to setup and teardown connections for the Consumer.

CONSUMER_STAKE

Operations

Join network

Consumer who is joining the network will send request to few random members of the cluster and then keeps improving to a better member based on latency with the cluster member. It is possible for cluster to specify the best cluster member based on the location of the member, though it is upto the cluster how to set the discovery process for the consumer.

func discoverPeers():
    ClusterId[] clusters = getClusterList()
    for cluster in clusters
        PeerId peerId = findPeer(cluster)
        Peer(peerId).connect()
func findPeer(bytes[] clusterId):
    return ClusterBeacon::findPeer(locationId)
func getClusterList():
    ClusterId[] clusterCreations = ClusterMarketplace::Events::ClusterCreated.filter('clusterId')
    ClusterId[] clusterClosureRequests = ClusterMarketplace::Events::ClusterClosureRequested.filter('clusterId')
    return getActiveClusters(clusterCreations, clusterClosureRequests)
func getActiveClusters(ClusterId[] clustersCreated, ClusterId[] clustersClosed):
   // Get clusters which are created but not closed by comparing both arrays
func isConsumerRegistered(bytes[] consumerAddress):
    return (ConsumerRegister::getConsumer(consumerAddress).call() != null)
func joinNetwork():
    if !isConsumerRegistered(this.consumerAddress)
        LINToken::approve(consumerRegistryContractAddress, CONSUMER_STAKE).send(this.stakerPrivKey)
        ConsumerRegister::register(this.withdrawalAddress, this.consumerAddress).send(this.stakerPrivKey)
    discoverPeers()
func onNewPeer(peer):
    ClusterMemberList[peer.clusterId].push(peer)

Exit Network

Consumer can exit the network by disconnecting to all the relayers.

func exitNetwork():
    disconnectPeers()
    ConsumerRegister::deregister().send(this.consumerPrivKey)
func disconnectPeers():
    for clusterPeers in ClusterMemberList:
        for peer in clusterPeers
            Peer(peer).disconnect()
            delete ClusterMemberList[clusterPeers][peer]

Performance Voting

Any receiver can raise a dispute that they were not able to construct a message specifying the message Id. In this case, the producer is asked to submit the acknowledgements above specified threshold. If required number of acks are not submitted by the producer, the producer is slashed for withholding blocks. If submitted, then a randomly selected set of receivers are asked to if they were able to reconstruct the block. Based on the vote of the majority, dispute is resolved.

Question on which voting happens is "Is cluster performing correctly where correctly means whether SLA promised is being maintained, is cluster spamming you"

Performance Dispute Contract

Storage
struct Dispute {
    bytes32 messageId;
    address[] producers;
    uint status; // 1 -> dispute raised, 2 -> acks verified, 3 ->  vote committed, 4 -> vote revealed
    uint timestamp;
    uint verifiedProducers;
    uint votes;
}
struct Vote {
    bytes32 target
    address sender
    bytes32 hashedVote
    bool vote
}
mapping(bytes32 => Dispute) disputes
mapping(bytes32 => (uint => bool)) verifiedAckIndices
mapping(bytes32 => (uint => Vote)) committedVotes
Methods
function raiseDispute(bytes32 messageId, address[] producers) {
    Dispute dispute = Dispute(messageId, null, 1, now, 0)
    dispute.producers = producers
    dispute.noOfProducers = producers.length
    dispute.verifiedProducers = 0
    disputes[messageId] = dispute
    emit DisputeRaised(msg.sender, messageId, producers)
}

function verifyAcks(bytes32 messageId, uint producerIndex, bytes[][] acks) {
    Dispute dispute = disputes[messageId]
    require(dispute.producers[producerIndex] == msg.sender && !verifiedIndices[messageId][producerIndex])
    require(now - dispute.timestamp < ACK_VERIFY_PERIOD && dispute.status == 1)
    for ack in acks:
        isAckValid = verifyAck(messgeId, ack, dispute.producers[producerIndex])
        if !isAckValid:
            slash(producer)
            delete dispute[messageId]
    verifiedAckIndices[messageId][producerIndex] = true
    dispute.verifiedProducers++
    disputes[messageId] = dispute
    if(dispute.verifiedProducers == dispute.producers.length):
        dispute.status = 2
        dispute.timestamp = now
}

<!-- TODO: Add new producer who wasn't part of the dispute by providing proof -->

function verifyAck(bytes32 messageId, bytes[] ack, address producer) internal {
    require(messageId == ack.messageId)
    ackPayload = ack.messageId + ack.chunkId + ack.clusterId + ack.timestamp + ack.producerKey
    ackSigner = ecdsa.verify(ackPayload, ack.sign)
    return (ackSigner == producer)
}

func slashInactiveProducer(bytes32 messageId, uint producerIndex) {
    Dispute dispute = disputes[messageId]
    require(now > dispute.timestamp + ACK_VERIFY_PERIOD)
    if(!verifiedAckIndices[messageId][producerIndex]):
        slash(dispute.producers[producerIndex])
}

func sealedVote(bytes32 messageId, uint peerSetIndex, bool hashedVote) {
    Dispute dispute = disputes[messageId]
    require(now - dispute.timestamp < VOTE_COMMIT_PERIOD && dispute.status == 2 )
    peerVote = committedVotes[messageId][peerSetIndex]
    if(!peerVote.target) {
        peerVote.target = keccak256(getRand(), messageId, peerSetIndex)
        peerVote.sender = msg.sender
        peerVote.hashedVote = hashedVote
    }
    if(peerVote.target - peerVote.voter > 0) {
        diff = peerVote.target - peerVote.voter
    } else {
        diff = peerVote.voter - peerVote.target
    }
    if(peerVote.target - msg.sender > 0) {
        diffNew = peerVote.target - msg.sender
    } else {
        diffNew = msg.sender - peerVote.target
    }
    if(diff > diffNew) {
        peerVote.sender = msg.sender
        peerVote.hashedVote = hashedVote
        committedVotes[messageId][peerSetIndex] = peerVote
    }
}

function revealVote(bytes32 messageId, uint peerSetIndex, bool vote, uint salt) {
    Dispute dispute = disputes[messageId]
    if(dispute.status == 2 && now - dispute.timestamp > VOTE_COMMIT_PERIOD) {
        dispute.status = 3
        dispute.timestamp = dispute.timestamp + VOTE_COMMIT_PERIOD
    }
    require(dispute.status == 3)
    require(now - dispute.timestamp < VOTE_REVEAL_PERIOD)
    peerVote = committedVotes[messageId][peerSetIndex]
    require(keccak256(vote, salt) == peerVote.hashedVote && msg.sender == peerVote.sender)
    peerVote.vote = vote
    if(peerVote.vote) {
        dispute.votes++
    } else {
        dispute.votes--
    }
}

function resolveDispute(bytes32 messageId) {
    Dispute dispute = disputes[messageId]
    require(dispute.timestamp + VOTE_REVEAL_PERIOD < now)
    if(dispute.votes > 0) {
        return true
    } else {
        return false
    }
}

Raise Dispute

If any receiver didn't receive all the chunks necessary to reconstruct a message within a time period, receiver can raise a dispute by staking a small amount sufficient to cover the transaction costs of the dispute process. Disputer should create a smart contract with the small portion of ether necessary to cover transaction costs. Dispute process will stop, if the ether locked is not sufficient to cover costs(max gas price is mentioned, so that ether is not depleted ).

Cost for starting a dispute will be the tx costs for the voting. But if the dispute is actually true, in that case some part of the stake is given to dispute raiser. This ensures that it is incentive compatible to raise disputes.

func raiseDispute(bytes[] messageId, bytes[][] producers):
    PerfDisputeContract::raiseDispute(messageId, producers)

Select Voter Set

A set of voters are generated by random selection to vote on the dispute. The random number RANDi is generated onchain by using something similar to RANDAO in the i th epoch. The random set for a dispute created in ith epoch is calculated using the following

Target = Keccak256(RANDi + 1, messageId, voterIndex)

voterIndex is an index of the voter set we are calculating. The admin key closest to the target for each of the voterIndex is the selected voter for that index. This way we create a set of voters to resolve the dispute.

func selectVoterSet(bytes[] messageId, uint onchainRand):
    voterList = getSortedVoterList()
    for(let i = 0; i < VOTE_SET_SIZE; i++) {
        bytes32 target = Keccak256(onchainRand, messageId, i)
        uint i = findIndex(target, voterList)
        if voterList[i] - target > target - voterList[i-1]
            voterSet.push(voterList[i - 1])
        else
            voterSet.push(voterList[i])
    }
    return voterSet

Caclulate Vote

func calculateVote(bytes[] messageId):
    chunks = chunkCache[messageId]
    if(now - chunks.timestamp > getGuanantees(chunks[0].channelId, 'latency')):
        return false
    else 
        return true

Vote on Dispute

The selected set of voters vote on dispute by specifying the index number of the producer in the voter set.

func vote(bytes[] messageId, uint voterSetIndex):
    bool vote = calculateVote(messageId)
    salt = Math.rand()
    voteCache[messageId].push(Vote(voterSetIndex, vote, salt))
    hashedVote = keccak256(vote, salt)
    PerfDisputeContract::sealedVote(messageId, voterSetIndex, hashedVote).send(this.privKey)
    onVoteRevealTimer(() => {
        PerfDisputeContract::revealVote(messageId, voterSetIndex, voteCache[messageId].vote, voteCache[messageId].salt).send(this.privKey)
    })

Voting Procedure

func onPerfDisputeRaised(bytes[] messageId):
    voterSet = selectVoterSet(messageId, getCurrentOnchainRand())
    index = voterSet.find(this.pubKey)
    if index != -1
        calulateVote(messageId)
        vote(messageId, index)

Resolve Dispute

func onRevealPeriodEnd(bytes[] messageId):
    PerfDisputeContract::resolveDispute(messageId).send(this.privKey)

Analysis

In this section we analyse the mechanism for raising and resolving dispute and derive relations between various parameters.

Assuming that the clusters distribute the data consistently across the receivers, we calculate the probability of picking disputeResolvers receivers among totReceivers with honestFraction of them being honest such that atleast threshold% of the disputeResolvers receivers are honest due to which schelling will resolve correctly. We ensure that the same receiver is not picked twice in the same set of dispute resolvers.

  • P(x) : Probability of exactly x receivers out of the selected disputeResolvers receivers being honest when picked from the total cluster pool of size totReceivers
  • P(x+) : Probability of atleast x receivers out of the selected disputeResolvers receivers being honest when picked from the total cluster pool of size totReceivers
  • h : Number of honest receivers necessary in the smaller selected dispute resolvers which is given by threshold * disputeResolvers
  • d : Number of dispute resolvers necessary to resolve the performance dispute
  • hf : Minimum Honest fraction of the total receiver set

P(reqChunks+) = P(reqChunks) + P(reqChunks + 1) + ..... + P(totChunks)

P(x) = hfd-x (1-hf)x

P(reqChunks+) =